ATRVjr Reinforcement Learning#

This tutorial extends Cart Pole Reinforcement Learning by training a model to drive an ATRVjr vehicle to a target. This tutorial consists of 2 parts: building the environment and training a model on the environment.

Requirements:

In this tutorial we will:

  • Implement gymnasium.Env

    • Create the multibody

    • Create Environment

    • Setup the simulation

    • Environment attributes

    • Setup helpers

    • Reset the environment

    • Step the environment

  • Train the model

    • Load the environment

    • Setup callbacks

    • Train a SAC model

    • Test model performance

Scripts:

For a more in-depth descriptions of kdflex concepts see usage.

First let’s verify that the reinforcement learning packages are installed:

try:
    import stable_baselines3
    import gymnasium
except ModuleNotFoundError:
    raise ModuleNotFoundError(
        "Please ensure that the following python packages are installed:\n"
        "    stable_baseline3 gymnasium"
    )

Implement gymnasium.Env#

Because classes can’t be split across notebook cells, we will show snippets of code with explanations. The full class is found at env.py.

from typing import Optional, cast
import numpy as np
import gymnasium as gym
from pathlib import Path
from math import pi, sqrt

from Karana.Frame import FrameContainer
from Karana.Core import discard, allDestroyed
from Karana.Dynamics.SOADyn_types import (
    MultibodyDS,
    PinSubhingeDS,
    Linear3SubhingeDS,
    SphericalSubhingeDS,
    HingeDS,
    BodyWithContextDS,
    ScenePartSpecDS,
)
from Karana.Dynamics import (
    Multibody,
    HingeType,
    StatePropagator,
    TimedEvent,
    PhysicalBody,
    PhysicalHinge,
)
from Karana.Math import UnitQuaternion, HomTran
from Karana.Scene import (
    BoxGeometry,
    CylinderGeometry,
    Color,
    PhysicalMaterialInfo,
    PhysicalMaterial,
    LAYER_GRAPHICS,
    LAYER_ALL,
    LAYER_COLLISION,
    Texture,
)
from Karana.Scene import ProxyScenePart, ProxyScene
from Karana.Scene import CoalScene
from Karana.Collision import FrameCollider
from Karana.Models import Gravity, UniformGravity, UpdateProxyScene, SyncRealTime, PenaltyContact, OutputUpdateType
from Karana.Math import IntegratorType


# helper to wrap angles
def wrap_to_pi(x):
    return (x + np.pi) % (2 * np.pi) - np.pi

Create the Multibody#

We create and edit our multibody by importing a URDF file into a SOADyn_types.MultibodyDS. This is the same procedure as ATRVjr driving.

def createMbody(fc: FrameContainer) -> Multibody:
    urdf_file = Path().absolute().parent / "resources" / "atrvjr" / "atrvjr.urdf"
    multibody_info = MultibodyDS.fromUrdf(urdf_file)
    # convert to full6DOF hinge between root frame and rover.
    multibody_info.base_bodies[0].body.hinge = HingeDS(
        hinge_type=HingeType.FULL6DOF,
        subhinges=[
            Linear3SubhingeDS(prescribed=False),
            SphericalSubhingeDS(prescribed=False),
        ],
    )

    # unlock locked Hinges for wheels from the URDF
    for i in [0, 1]:
        multibody_info.base_bodies[0].children[i].body.hinge = HingeDS(
            hinge_type=HingeType.REVOLUTE,
            subhinges=[PinSubhingeDS(prescribed=False, unit_axis=np.array([0.0, 0.0, 1.0]))],
        )
    for i in [2, 3]:
        multibody_info.base_bodies[0].children[i].body.hinge = HingeDS(
            hinge_type=HingeType.REVOLUTE,
            subhinges=[PinSubhingeDS(prescribed=False, unit_axis=np.array([0.0, 0.0, -1.0]))],
        )
        
    # Remove all collision meshes. We want to add these manually.
    def removeCollision():
        def _removeParts(parts: list[ScenePartSpecDS]):
            idx = []
            for k,p in enumerate(parts):
                if p.layers == LAYER_COLLISION:
                    idx.append(k)
            idx.reverse()
            for k in idx:
                parts.pop(k)

        def _recurse(b: BodyWithContextDS):
            _removeParts(b.body.scene_parts)
            for c in b.children:
                _recurse(c)

        for base in multibody_info.base_bodies:
            _recurse(base)

    removeCollision()

    multibody = multibody_info.toMultibody(fc)
    multibody.ensureHealthy()
    multibody.resetData()
    assert multibody.isReady()
    return multibody

Create Environment#

Here, we attach cylinder geometries for the wheels. Additionally, we set up the ground and apply a custom grid texture from a .png file using Karana.Scene.Texture.lookupOrCreateTexture(). We add walls around all four sides to detect when the car is going out of bounds by checking for collisions.

def createEnvironment(
    size: int, mb: Multibody, proxy_scene: ProxyScene
) -> tuple[PhysicalBody, PhysicalBody, PhysicalBody, PhysicalBody]:
    thickness = 0.25
    height = 2.5
    mat_info = PhysicalMaterialInfo()
    mat_info.color = Color.GRAY
    gray = PhysicalMaterial(mat_info)
    # add wheels to our atrv model
    wheel_geom = CylinderGeometry(0.125, 0.075)
    left_front_wheel = mb.getBody("left_front_wheel_link")
    left_front_wheel_node = ProxyScenePart(
        name="left_front_wheel_link_node", scene=proxy_scene, geometry=wheel_geom, material=gray
    )
    left_front_wheel_node.attachTo(left_front_wheel)
    left_front_wheel_node.setTranslation([0, 0, 0.03])
    left_front_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))

    left_rear_wheel = mb.getBody("left_rear_wheel_link")
    left_rear_wheel_node = ProxyScenePart(
        name="left_rear_wheel_link_node", scene=proxy_scene, geometry=wheel_geom, material=gray
    )
    left_rear_wheel_node.attachTo(left_rear_wheel)
    left_rear_wheel_node.setTranslation([0, 0, 0.03])
    left_rear_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))

    right_front_wheel = mb.getBody("right_front_wheel_link")
    right_front_wheel_node = ProxyScenePart(
        name="right_front_wheel_link_node", scene=proxy_scene, geometry=wheel_geom, material=gray
    )
    right_front_wheel_node.attachTo(right_front_wheel)
    right_front_wheel_node.setTranslation([0, 0, 0.03])
    right_front_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))

    right_rear_wheel = mb.getBody("right_rear_wheel_link")
    right_rear_wheel_node = ProxyScenePart(
        name="right_rear_wheel_link_node", scene=proxy_scene, geometry=wheel_geom, material=gray
    )
    right_rear_wheel_node.attachTo(right_rear_wheel)
    right_rear_wheel_node.setTranslation([0, 0, 0.03])
    right_rear_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))

    # add ground
    ground_geom = BoxGeometry(size, size, thickness)
    mat_info.color = Color.WHITE
    texture_file = Path().absolute().parent / "resources" / "atrvjr" / "grid.png"
    grid_texture = Texture.lookupOrCreateTexture(str(texture_file))
    mat_info.color_map = grid_texture
    grid = PhysicalMaterial(mat_info)
    ground = ProxyScenePart(
        "ground", scene=proxy_scene, geometry=ground_geom, material=grid, layers=LAYER_ALL
    )
    ground.setTranslation([0, 0, -0.1])

    # add walls
    mat_info.color = Color.KHAKI
    mat_info.color_map = None
    khaki = PhysicalMaterial(mat_info)
    wall_geom = BoxGeometry(size, thickness, height)
    north_wall = ProxyScenePart(
        "north_wall", scene=proxy_scene, geometry=wall_geom, material=khaki, layers=LAYER_ALL
    )
    north_wall.setTranslation([size / 2.0 + thickness / 2, 0, height / 2 - thickness])
    north_wall.setUnitQuaternion(UnitQuaternion([1 / sqrt(2), 1 / sqrt(2), 0, 0]))
    south_wall = ProxyScenePart(
        "south_wall", scene=proxy_scene, geometry=wall_geom, material=khaki, layers=LAYER_ALL
    )
    south_wall.setTranslation([-size / 2.0 - thickness / 2, 0, height / 2 - thickness])
    south_wall.setUnitQuaternion(UnitQuaternion([-1 / sqrt(2), 1 / sqrt(2), 0, 0]))
    west_wall = ProxyScenePart(
        "west_wall", scene=proxy_scene, geometry=wall_geom, material=khaki, layers=LAYER_ALL
    )
    west_wall.setTranslation([0, size / 2.0 + thickness / 2, height / 2 - thickness])
    east_wall = ProxyScenePart(
        "east_wall", scene=proxy_scene, geometry=wall_geom, material=khaki, layers=LAYER_ALL
    )
    east_wall.setTranslation([0, -size / 2.0 - thickness / 2, height / 2 - thickness])

    return left_front_wheel, left_rear_wheel, right_front_wheel, right_rear_wheel

Initialize Attributes#

Similar to Cart Pole, it is necessary to implement a few key attributes. Here is the breakdown for this scenario:

  • observation space: The agent is able to see the target position, car position, car linear velocities, car roll, car yaw velocity, sine of the angle deviation from target, and the cosine of the angle deviation.

    • roll is passed so the agent can see if it turns too sharply and flips over

    • it is common to use sin and cosine for angles to preserve continuity in values, as a jump between -3.14 and 3.14 radians (which are the same) can mess up training

    • minimum and maximum velocities were found through direct testing

  • action space: The agent is able to select 2 continuous values between -1 and 1 that is scaled by self._torque_magnitude to be applied to the left-side and right-side wheels.

  • np_random is a random number generator which can be seeded for reproducibility.

  • render_mode determines whether or not to show graphics visualizations. “none” is faster, but shows no graphics while human renders the webscene.

class ATRVEnv(gym.Env):
    metadata = {"render_modes": ["human", "none"]}

    def __init__(
        self,
        render_mode: Optional[str] = None,
        port: int = 0,
        sync_real_time: bool = False,
        **kwargs,
    ):
        self.render_mode = "none"
        if render_mode and render_mode in self.metadata["render_modes"]:
            self.render_mode = render_mode
        self.size = 30
        self._bound = self.size / 2.0 - 3.0
        self._torque_magnitude = 20.0

        # initialize the simulation
        self.init_sim(port, sync_real_time)

        # [txpos, typos, x, y, vx, vy, roll, yaw_vel, sin(angle_diff), cos(angle_diff)]
        low = np.array(
            [
                -self._bound,
                -self._bound,
                -self.size,
                -self.size,
                -6.0,
                -6.0,
                -0.25,
                -6.0,
                -1.0,
                -1.0,
            ],
            dtype=np.float32,
        )
        high = np.array(
            [self._bound, self._bound, self.size, self.size, 6.0, 6.0, 0.25, 6.0, 1.0, 1.0],
            dtype=np.float32,
        )
        self.observation_space = gym.spaces.Box(low=low, high=high, dtype=np.float32)

        # define action space
        # torque on [left, right]
        self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(2,), dtype=np.float32)

        # initialize class variables
        self._target = None
        self._left = None
        self._right = None
        self._target_pos = None
        self._agent_pos = None
        self._agent_vel = None
        self._agent_roll = None
        self._agent_yaw_vel = None
        self._angle_diff = None
        self.np_random = None

Setup the Simulation#

Here the environment follows the same simulation procedures all other notebooks. This time, the Karana.Frame.ChainedFrameToFrame as well as the 6 degree of freedom hinge between the origin and multibody is saved.

  • Karana.Frame.ChainedFrameToFrame (self._f2f) allows for querying of the relative pose, spatial velocity, and spatial acceleration. Since this is between the world origin and the ATRVjr multibody, then this will query for the world coordinates of the multibody.

  • Karana.Dynamics.PhysicalHinge (self._root_hinge) consists of a full 3D positional and 3D angular hinge between the world origin and the ATRVjr multibody. By setting the linear and angular subhinge values, the pose, velocity, and acceleration of the multibody can be set.

The StatePropagator callback functions implement applying torques and checking for termination.

  • check_termination(t, x) will terminate the episode if the car has flipped over or collided with the walls.

  • pre_deriv_fn(t, x) applies torques to move the wheels.

def init_sim(self, port: int = 0, sync_real_time: bool = False):
        # setup bodies and scene
        self._fc = FrameContainer("root")
        self._mb = createMbody(self._fc)

        # attach our multibody's to the root frame
        self._f2f = cast(PhysicalBody, self._mb.virtualRoot()).frameToFrame(
            self._mb.getBody("center_link")
        )
        self._root_hinge = cast(PhysicalHinge, self._mb.getBody("center_link").parentHinge())

        # setup graphics if enabled
        if self.render_mode == "human":
            self._cleanup_graphics, viz_scene = self._mb.setupGraphics(axes=0.5, port=port)
            viz_scene.defaultCamera().pointCameraAt(
                [-0.8 * self.size, 0.0, self.size], [0, 0, 0], [0, 0, 1]
            )
            self._proxy_scene = self._mb.getScene()
        else:
            self._proxy_scene = ProxyScene(f"{self._mb.name()}_proxyscene", self._mb.virtualRoot())

        # setup wheels, ground, and walls
        left_front_wheel, left_rear_wheel, right_front_wheel, right_rear_wheel = createEnvironment(
            self.size, self._mb, self._proxy_scene
        )

        # setup collision client
        col_scene = CoalScene("collision_scene")
        self._proxy_scene.registerClientScene(
            col_scene, self._mb.virtualRoot(), layers=LAYER_COLLISION
        )

        # setup state propagator and models
        self._sp = StatePropagator(self._mb, integrator_type=IntegratorType.RK4)

        # gravity model
        ug = Gravity("grav_model", sp, UniformGravity("uniform_gravity"), mb)
        ug.getGravityInterface().setGravity(np.array([0, 0, -9.81]), 0.0, OutputUpdateType.PRE_HOP)
        del ug

        # for visualization
        if sync_real_time:
            SyncRealTime("sync_real_time",self._sp, 1.0)

        # update proxy scene model
        UpdateProxyScene("update_proxy_scene",self._sp, self._proxy_scene)

        # collision model
        hc = HuntCrossley("hunt_crossley_contact")
        hc.params.kp = 100000
        hc.params.kc = 20000
        hc.params.mu = 0.3
        hc.params.n = 1.5
        hc.params.linear_region_tol = 1e-3
        PenaltyContact("penalty_contact", self._sp, self._mb, [FrameCollider(self._proxy_scene, col_scene)], hc)
        del hc

        def check_termination(t, x):
            # check if the car is flipping over
            flip = abs(self._f2f.relTransform().getUnitQuaternion().toEulerAngles().alpha()) > 0.2
            # if more than our 4 wheels are in contact, we have a collision
            if len(col_scene.cachedCollisions()) > 4 or flip:
                self._terminated = True
                return True
            return False

        def pre_deriv_fn(t, x):
            left_front_wheel.parentHinge().subhinge(0).setT(self._left)
            left_rear_wheel.parentHinge().subhinge(0).setT(self._left)
            right_front_wheel.parentHinge().subhinge(0).setT(self._right)
            right_rear_wheel.parentHinge().subhinge(0).setT(self._right)

        self._sp.fns.pre_deriv_fns["apply_torque"] = pre_deriv_fn
        self._sp.fns.terminate_advance_to_fns["check_termination"] = check_termination

Setup Helpers#

These are a few helper functions for gymnasium. _get_obs composes the observation values into a numpy array for a model to train on, and _get_info is primarily for user debugging.

def _get_obs(self):
        return np.concatenate(
            (
                self._target_pos,
                self._agent_pos,
                self._agent_vel,
                self._agent_roll,
                self._agent_yaw_vel,
                np.sin(self._angle_diff),
                np.cos(self._angle_diff),
            )
        ).astype(np.float32)

    # use non-normalized values for info
    def _get_info(self):
        return {
            "target_pos": self._target_pos,
            "agent_pos": self._agent_pos,
            "agent_vel": self._agent_vel,
            "agent_roll": self._agent_roll,
            "agent_yaw_vel": self._agent_yaw_vel,
            "angle_diff": self._angle_diff,
        }

    def close(self):
        del self._proxy_scene, self._f2f, self._root_hinge, self._target
        discard(self._sp)
        if self.render_mode == "human":
            self._cleanup_graphics()
        discard(self._mb)
        discard(self._fc)
        assert allDestroyed()

Reset the Environment#

As explained in Cart Pole, this method refreshes the environment to a new scenario at the start of every episode. Here, the stored attributes are reset, and the multibody is set to an arbitrary position and yaw orientation using its root hinge. At the same time, the blue target is set to a random position on the board as well.

def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        if self.np_random is None:
            self.np_random, _ = gym.utils.seeding.np_random(seed)

        # initialize values
        self._terminated = False
        self._left = 0.0
        self._right = 0.0
        self._target_pos = self.np_random.uniform(-self._bound, self._bound, size=(2,)).astype(
            np.float32
        )
        self._agent_pos = self.np_random.uniform(-self._bound, self._bound, size=(2,)).astype(
            np.float32
        )
        self._agent_vel = np.zeros(2, dtype=np.float32)
        self._agent_roll = np.zeros(1, dtype=np.float32)
        self._agent_yaw_vel = np.zeros(1, dtype=np.float32)
        agent_yaw = self.np_random.uniform(-np.pi, np.pi, size=(1,)).astype(np.float32)

        # calculate previous distance and angle difference
        difference = self._target_pos - self._agent_pos
        ideal_angle = np.arctan2(difference[1], difference[0])
        self._distance = np.linalg.norm(difference)
        self._angle_diff = wrap_to_pi(ideal_angle - agent_yaw)

        # reset the rover position and velocity
        self._mb.resetData()
        orientation = UnitQuaternion(agent_yaw[0], [0.0, 0.0, 1.0])
        self._root_hinge.fitQ(
            HomTran(q=orientation, vec=[self._agent_pos[0], self._agent_pos[1], 0])
        )

        # add the target
        if not self._target:
            mat_info = PhysicalMaterialInfo()
            mat_info.color = Color.BLUE
            blue = PhysicalMaterial(mat_info)

            # create the target
            target_geom = CylinderGeometry(0.25, 0.05)
            self._target = ProxyScenePart(
                "target_part",
                scene=self._proxy_scene,
                geometry=target_geom,
                material=blue,
                layers=LAYER_GRAPHICS,
            )
        self._target.setTranslation([self._target_pos[0], self._target_pos[1], 0.25])
        self._target.setUnitQuaternion(UnitQuaternion([1 / sqrt(2), 0, 0, 1 / sqrt(2)]))

        # simulation
        t_init = np.timedelta64(0, "ns")
        self._sp.setTime(t_init)
        self._sp.setState(self._sp.assembleState())

        # setup timed event (every 0.01s)
        h = np.timedelta64(int(1e7), "ns")
        t = TimedEvent("hop_size", h, lambda _: None, False)
        t.period = h
        self._sp.registerTimedEvent(t)
        del t
        self._proxy_scene.update()
        return self._get_obs(), self._get_info()

Step the Environment#

After every model decision, the simulation is advanced and the current multibody pose and velocities are queried using the FrameToFrame. These values will be passed as part of the observation.

This reward function will give 4.0 for reaching the target, and -4.0 for collision. If neither of these terminating conditions are met, then it uses these criteria:

  • progress how much closer the car has gotten to the target compared to the last timestep

  • angle_progress how much closer the car’s front is facing to the target compared to the last timestep

  • step penalty subtract a small amount per step to incentivize the car to reach the target as soon as possible

    def step(self, action: np.ndarray):
        torque = action * self._torque_magnitude
        self._left = torque[0]
        self._right = torque[1]
        self._sp.advanceBy(0.05)
        T = self._f2f.relTransform()
        self._agent_pos = T.getTranslation()[:2]
        q = T.getUnitQuaternion()
        ea = q.toEulerAngles()
        agent_yaw = np.array([ea.gamma()])
        self._agent_roll = np.array([ea.alpha()])
        sp_vel = self._f2f.relSpVel().toVector6()
        self._agent_vel = sp_vel[3:5]
        self._agent_yaw_vel = np.array([sp_vel[2]])

        # check termination
        difference = self._target_pos - self._agent_pos
        distance = np.linalg.norm(difference)
        ideal_angle = np.arctan2(difference[1], difference[0])
        angle_diff = wrap_to_pi(ideal_angle - agent_yaw)
        reward = 0.0
        if bool(distance < 1.0):
            self._terminated = True
            reward += 4.0
        elif self._terminated:
            reward -= 4.0
        else:
            # reward calculations
            progress = self._distance - distance
            angle_progress = np.abs(self._angle_diff) - np.abs(angle_diff)
            reward += progress
            reward += 4 * angle_progress
            reward -= 0.03  # step penalty
            self._distance = distance
            self._angle_diff = angle_diff

        # observation, reward, terminated, truncated, data
        return self._get_obs(), float(reward), self._terminated, False, self._get_info()

Train the model#

Awesome, now that the environment is setup training can begin. This tutorial also uses Stable Baselines3, so make sure these dependencies are installed:

pip install stable-baselines3
pip install gymnasium
import os
from env import ATRVEnv
import gymnasium as gym
from stable_baselines3 import SAC
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize

gym.register(
    id="kdFlex-ATRVjr",
    entry_point=ATRVEnv,
    max_episode_steps=200,
)

Load the Environment#

Here, we load our training environment by registering it with gymnasium and wrapping it with DummyVecEnv to be compatible with Stable Baselines3. Here, we demonstrate using a function to create multiple environments in a single vectorized environment. While DummyVecEnv will only batch them in a single process, it is also possible to parallelize across different processes using SubProcVecEnv.

There are also 2 other wrappers we use.

  • Monitor tracks key data for training

  • VecNormalize automatically normalizes the observations and reward since training models can be unstable with large or varied values.

train_env = env = gym.make("kdFlex-ATRVjr")
train_env = DummyVecEnv([lambda: train_env])
train_env = VecNormalize(train_env)

Setup Callbacks#

A common technique is to use a separate evaluation environment to run periodically during training and stop training if the model hasn’t improved for a specified period of time to prevent overfitting. This section accomplishes this by adding an EvalCallback.

  • EvalCallback also saves the best model into ./models/best_model.zip

Train a SAC Model#

Because the action space is continuous, a Soft Actor Critic (SAC) model is used with the default hyperparameters.

model = SAC(
    "MlpPolicy",
    train_env,
)
# check if this is running in a test environment
if os.getenv("DTEST_RUNNING", True):
    # in a testing environment, run for less
    model.learn(total_timesteps=1000)
else:
    # takes around 30 minutes
    model.learn(total_timesteps=500000)

Test the Model#

Because the model was trained on normalized observation values, it is important that the test environment also uses normalized values. Therefore, the normalization configuration is first saved into ./models/vec_normalize.pkl before it is loaded into the test environment.

# save the normalization statistics
train_env.save("./models/vec_normalize.pkl")
train_env.close()
test_env = gym.make("kdFlex-ATRVjr", render_mode="human", sync_real_time=True)
test_env = DummyVecEnv([lambda: test_env])
test_env = VecNormalize.load("./models/vec_normalize.pkl", test_env)
test_env.training = False
test_env.norm_reward = False
terminated = False
obs = test_env.reset()
while not terminated:
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, info = test_env.step(action)

A working model is also saved inside ./models/demo_models if you have difficulty training the model.

test_env.close()
test_env = gym.make("kdFlex-ATRVjr", render_mode="human", sync_real_time=True)
test_env = DummyVecEnv([lambda: test_env])
test_env = VecNormalize.load("./models/demo_vec_normalize.pkl", test_env)
test_env.training = False
test_env.norm_reward = False
model = SAC.load("./models/demo_model", env=test_env)
# cleanup
test_env.close()
# hard process exit
import os

os._exit(0)

Summary#

By following the Gymnasium’s environment standards, you have implemented and solved a more complicated driving scenario. Now you are able to utilize kdFlex’s low-cost computational complexity to tackle increasingly harder problems. Good luck!