ATRVjr ROS Integration#

In this notebook we import the ATRVjr rover model into kdflex and interface with ROS. We will publish messages to ROS as the sim advances as well as listen for a ROS message to command the vehicle’s driving mode.

Requirements:

In this tutorial we will:

  • Import and modify the URDF

  • Setup the kdFlex scene

  • Attach wheel geometries

  • Setup StatePropagator and models

  • Setup the Simulation

  • Integrate with ROS

  • Run and clean up

Acknowledgements
The robot URDF model included in this project is Copyright (c) 2018 diasdm and licensed under the BSD 2‑Clause License (link). Some modifications were made to adapt the model for use within the kdflex simulation framework, including simplifying wheel geometries due to current rendering and collision handling limitations.

For a more in-depth descriptions of kdflex concepts see usage.

First let’s confirm that ROS is installed and properly sourced

try:
    import rclpy
except ModuleNotFoundError:
    raise ModuleNotFoundError(
        "rclpy package is not found!\n"
        "Please ensure ROS2 is installed and that you've sourced the appropriate setup script.\n"
        "    https://docs.ros.org/en/kilted/Installation.html"
    )
import atexit
import numpy as np
from math import pi
from pathlib import Path
from Karana.Core import discard
from Karana.Frame import FrameContainer
from Karana.Dynamics.SOADyn_types import MultibodyDS
from Karana.Math import IntegratorType
from Karana.Dynamics import (
    Multibody,
    PhysicalBody,
    PhysicalBody,
    HingeType,
    PhysicalHinge,
    PinSubhinge,
    StatePropagator,
    TimedEvent,
)
from Karana.Dynamics.SOADyn_types import (
    HingeDS,
    PinSubhingeDS,
    SubhingeDS,
    Linear3SubhingeDS,
    SphericalSubhingeDS,
    BodyWithContextDS,
    ScenePartSpecDS,
)
from Karana.Scene import (
    AssimpImporter,
    SphereGeometry,
    BoxGeometry,
    CylinderGeometry,
    StaticMeshGeometry,
    AbstractStaticGeometry,
    Color,
    PhysicalMaterialInfo,
    PhysicalMaterial,
    LAYER_COLLISION,
    LAYER_GRAPHICS,
    LAYER_ALL,
)
from Karana.Scene import CoalScene
from Karana.Scene import ProxyScene, ProxySceneNode, ProxyScenePart
from Karana.Math import UnitQuaternion, SpatialInertia, HomTran
from Karana.Models import UniformGravity, UpdateProxyScene, SyncRealTime, PenaltyContact
from Karana.Collision import FrameCollider, HuntCrossley

Build the filepath to the URDF file we will import

resources = Path().absolute().parent / "resources"
urdf_file = resources / "atrvjr" / "atrvjr.urdf"

Import and Modify the URDF#

Import the URDF file to create a SOADyn_types.MultibodyDS. The “DS” is short for “Data Structure”. This is kdFlex’s native, file-format agnostic way of specifying a multibody. We’ll use this later on to instantiate the multibody.

It’s easier to edit the model while it’s in the MultibodyDS format before we convert it to a Multibody. In the cell below we the original hinge between the rover and the root frame to a full 6-DOF hinge, which allows it to move freely. We then convert the hinges between the wheel bodies to pin joints so the wheels can properly rotate.

Last we set all the visuals in the MultibodyDS to belong to the graphics layer only so that it can’t interact with the wheels which we add later or the ground.

mb_info = MultibodyDS.fromUrdf(urdf_file)

mb_info.model_dump()

# convert to full6DOF hinge between root frame and rover.
mb_info.base_bodies[0].body.hinge = HingeDS(
    hinge_type=HingeType.FULL6DOF,
    subhinges=[
        Linear3SubhingeDS(prescribed=False),
        SphericalSubhingeDS(prescribed=False),
    ],
)

# unlock locked Hinges for wheels from the URDF
for i in [0, 1]:
    mb_info.base_bodies[0].children[i].body.hinge = HingeDS(
        hinge_type=HingeType.PIN,
        subhinges=[PinSubhingeDS(prescribed=False, unit_axis=np.array([0.0, 0.0, 1.0]))],
    )
for i in [2, 3]:
    mb_info.base_bodies[0].children[i].body.hinge = HingeDS(
        hinge_type=HingeType.PIN,
        subhinges=[PinSubhingeDS(prescribed=False, unit_axis=np.array([0.0, 0.0, -1.0]))],
    )


# Remove all collision meshes. We want to add these manually.
def removeCollision():
    """Remove all the collision meshes."""

    def _removeParts(parts: list[ScenePartSpecDS]):
        idx = []
        for k, p in enumerate(parts):
            if p.layers == LAYER_COLLISION:
                idx.append(k)
        idx.reverse()
        for k in idx:
            parts.pop(k)

    def _recurse(b: BodyWithContextDS):
        _removeParts(b.body.scene_parts)
        for c in b.children:
            _recurse(c)

    for base in mb_info.base_bodies:
        _recurse(base)


removeCollision()

Create a Karana.Frame.FrameContainer to manager the frames layer.

fc = FrameContainer("root")

Instantiate and configure the Karana.Dynamics.Multibody based on the MultibodyDS description.

mb = mb_info.toMultibody(fc)

Finalize the model to get it ready for use.

mb.ensureCurrent()
mb.resetData()
mb.dumpTree()
|-atrvjr_MBVROOT_
   |-[FULL6DOF] center_link
      |-[PIN] left_front_wheel_link
      |-[PIN] left_rear_wheel_link
      |-[PIN] right_front_wheel_link
      |-[PIN] right_rear_wheel_link
      |-[LOCKED] cbimu_frame
      |-[LOCKED] mpuimu_frame
      |-[LOCKED] arm_base_frame
      |-[LOCKED] lrf_frame
      |-[LOCKED] novatel

Setup the kdFlex Scene#

Next we setup kdflex’s visualizer by creating a graphics scene and a webserver to view it. We use the helper method setupGraphics() to take care of setting up the scene and webserver for us.

See Visualization and Scene Layer for more information relating to this section.

cleanup_graphics, web_scene = mb.setupGraphics(port=0)
web_scene.defaultCamera().pointCameraAt(
    offset=np.array([[0.0], [5.0], [10.0]]),
    target=np.array([[0.0], [0.0], [0.0]]),
    up=np.array([[0.0], [0.0], [1.0]]),
)
proxy_scene = mb.getScene()
[WebUI] Listening at http://newton:39223
# attach a collision scene
col_scene = CoalScene("collision_scene")
proxy_scene.registerClientScene(col_scene, mb.virtualRoot(), layers=LAYER_COLLISION)

Display the bodies in the multibody model.

mb.displayModel()
      Body                     Parent              Hinge    Dofs          
      ____                     ______              _____    ____          
   1. center_link              atrvjr_MBVROOT_    FULL6DOF  6/0           
   2. left_front_wheel_link    center_link          PIN     1/6           
   3. left_rear_wheel_link     center_link          PIN     1/7           
   4. right_front_wheel_link   center_link          PIN     1/8           
   5. right_rear_wheel_link    center_link          PIN     1/9           
   6. cbimu_frame              center_link         LOCKED   0/10          
   7. mpuimu_frame             center_link         LOCKED   0/10          
   8. arm_base_frame           center_link         LOCKED   0/10          
   9. lrf_frame                center_link         LOCKED   0/10          
  10. novatel                  center_link         LOCKED   0/10          
                                                            ____          
                                                            10            

Attach Wheel Geometries#

wheel_geom = CylinderGeometry(0.125, 0.075)

# create a ground for the rover to drive on
ground_geom = BoxGeometry(30, 30, 0.25)
left_front_wheel = mb.getBody("left_front_wheel_link")
# left_front_wheel.addScenePartSpec(wheel_obj.parts[0])
# left_front_wheel.addScenePartSpec(wheel_obj.parts[1])
left_front_wheel_node = ProxyScenePart(
    name="left_front_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
left_front_wheel_node.attachTo(left_front_wheel)
left_front_wheel_node.setTranslation([0, 0, 0.03])
left_front_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# left_front_wheel_node.graphics().showAxes(1.0)

left_rear_wheel = mb.getBody("left_rear_wheel_link")
# left_rear_wheel.addScenePartSpec(wheel_obj.parts[0])
# left_rear_wheel.addScenePartSpec(wheel_obj.parts[1])
left_rear_wheel_node = ProxyScenePart(
    "left_rear_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
left_rear_wheel_node.attachTo(left_rear_wheel)
left_rear_wheel_node.setTranslation([0, 0, 0.03])
left_rear_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# left_rear_wheel_node.graphics().showAxes(1.0)

right_front_wheel = mb.getBody("right_front_wheel_link")
# right_front_wheel.addScenePartSpec(wheel_obj.parts[0])
# right_front_wheel.addScenePartSpec(wheel_obj.parts[1])
right_front_wheel_node = ProxyScenePart(
    "right_front_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
right_front_wheel_node.attachTo(right_front_wheel)
right_front_wheel_node.setTranslation([0, 0, 0.03])
right_front_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# right_front_wheel_node.graphics().showAxes(1.0)

right_rear_wheel = mb.getBody("right_rear_wheel_link")
# right_rear_wheel.addScenePartSpec(wheel_obj.parts[0])
# right_rear_wheel.addScenePartSpec(wheel_obj.parts[1])
right_rear_wheel_node = ProxyScenePart(
    "right_rear_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
right_rear_wheel_node.attachTo(right_rear_wheel)
right_rear_wheel_node.setTranslation([0, 0, 0.03])
right_rear_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# right_rear_wheel_node.graphics().showAxes(1.0)

mat_info = PhysicalMaterialInfo()
mat_info.color = Color.WHITESMOKE
gray = PhysicalMaterial(mat_info)

ground = ProxyScenePart(
    "ground", scene=proxy_scene, geometry=ground_geom, material=gray, layers=LAYER_ALL
)
ground.setTranslation([0, 0, -0.1])

del ground, mat_info, wheel_geom, ground_geom, gray

Setup StatePropagator and models#

# set up state propagator
sp = StatePropagator(mb, IntegratorType.RK4)
integ = sp.getIntegrator()

# Create a UniformGravity model, and set it's gravitational acceleration.
ug = UniformGravity("grav_model", sp, mb)
ug.params.g = np.array([0, 0, -9.81])
del ug

# Makes sure the visualization scene is updated after each state change.
UpdateProxyScene("update_proxy_scene", sp, proxy_scene)

# sync the simulation time with real-time.
SyncRealTime("sync_real_time", sp, 1.0)

# add penalty contact model and register its bodies and parameters
hc = HuntCrossley("hunt_crossley_contact")
hc.params.kp = 100000
hc.params.kc = 20000
hc.params.mu = 0.3
hc.params.n = 1.5
hc.params.linear_region_tol = 1e-3
PenaltyContact("penalty_contact", sp, mb, [FrameCollider(proxy_scene, col_scene)], hc)
del col_scene, hc

Because forces are cleared out in between simulation steps, we create a callback function to apply forces depending on the intended direction. We attach it as a Karana.Dynamics.SpFunctions.pre_deriv_fns, since it is called before the multibody derivative which ensures the forces are used in the state update calculations.

direction = "coast"


def pre_deriv_fn(t, x):
    """Set torques based on the driving direction.

    Parameters
    ----------
    t : float
        The current time.
    x : NDArray[np.float64]
       The current state.
    """
    # manually set torques in pre_deriv function
    global direction
    match direction:
        case "coast":
            right_rear_wheel.parentHinge().subhinge(0).setT([0])
            left_rear_wheel.parentHinge().subhinge(0).setT([0])
            right_front_wheel.parentHinge().subhinge(0).setT([0])
            left_front_wheel.parentHinge().subhinge(0).setT([0])
        case "forward":
            right_rear_wheel.parentHinge().subhinge(0).setT([4])
            left_rear_wheel.parentHinge().subhinge(0).setT([4])
            right_front_wheel.parentHinge().subhinge(0).setT([4])
            left_front_wheel.parentHinge().subhinge(0).setT([4])
        case "backward":
            right_rear_wheel.parentHinge().subhinge(0).setT([-4])
            left_rear_wheel.parentHinge().subhinge(0).setT([-4])
            right_front_wheel.parentHinge().subhinge(0).setT([-4])
            left_front_wheel.parentHinge().subhinge(0).setT([-4])
        case "rotate_right":
            right_rear_wheel.parentHinge().subhinge(0).setT([-5])
            left_rear_wheel.parentHinge().subhinge(0).setT([5])
            right_front_wheel.parentHinge().subhinge(0).setT([-5])
            left_front_wheel.parentHinge().subhinge(0).setT([5])
        case "rotate_left":
            right_rear_wheel.parentHinge().subhinge(0).setT([5])
            left_rear_wheel.parentHinge().subhinge(0).setT([-5])
            right_front_wheel.parentHinge().subhinge(0).setT([5])
            left_front_wheel.parentHinge().subhinge(0).setT([-5])


sp.fns.pre_deriv_fns["apply_torque"] = pre_deriv_fn

Setup the ROS interface#

EasyPubSub is a wrapper for a ROS node and executor which:

  • Simplifies publishing and subscribing to messages

  • Simplifies integration with the StatePropagator using a KModel

from Karana.KUtils.ros import EasyPubSub

pubsub = EasyPubSub("sim")

Create a KModel to communicate with ROS#

This model will periodically check for

from Karana.Models import KModel

from geometry_msgs.msg import PoseStamped
from std_msgs.msg import String
from builtin_interfaces.msg import Time


class RosComm(KModel):
    """ROS communication KModel."""

    def __init__(self, name: str):
        """Create a RosComm instance.

        Parameters
        ----------
        name : str
            The name of the RosComm KModel.
        """
        super().__init__(name, sp)
        sp.registerModel(self)
        self.pubsub = pubsub
        pubsub.subscribe("/command", String, self._handleCommand)

    def _handleCommand(self, cmd: String):
        if cmd.data not in ["coast", "forward", "backward", "rotate_right", "rotate_left"]:
            return
        global direction
        direction = cmd.data

    def _buildPoseStamped(self, t):
        ns = int(t.astype("timedelta64[ns]").astype(int))
        sec = ns // 1_000_000_000
        nsec = ns % 1_000_000_000
        root = mb.virtualRoot()
        chassis = mb.getBody("center_link")
        r2c = root.frameToFrame(chassis).relTransform()
        r2c_qvec = r2c.getUnitQuaternion().toVector4()

        msg = PoseStamped()
        msg.header.stamp = Time(sec=sec, nanosec=nsec)
        msg.header.frame_id = chassis.name()
        msg.pose.position.x = r2c.getTranslation()[0]
        msg.pose.position.y = r2c.getTranslation()[1]
        msg.pose.position.z = r2c.getTranslation()[2]
        msg.pose.orientation.x = r2c_qvec[0]
        msg.pose.orientation.y = r2c_qvec[1]
        msg.pose.orientation.z = r2c_qvec[2]
        msg.pose.orientation.w = r2c_qvec[3]
        return msg

    def postModelStep(self, t, x):
        """Process ROS subscriptions and publish the pose message.

        Parameters
        ----------
        t : float
            The current time.
        x : NDArray[np.float64]
           The current state.
        """
        pubsub.processSubscriptions()
        pose = self._buildPoseStamped(t)
        self.pubsub.publish("/pose", pose)

Setup the Simulation#

Finish by setting the initial states and timed events

# set integrator state
t_init = np.timedelta64(0, "ns")
x_init = sp.assembleState()
sp.setTime(t_init)
sp.setState(x_init)

# syncs up the graphics
proxy_scene.update()

# register timed event
h = np.timedelta64(int(1e7), "ns")
t = TimedEvent("hop_size", h, lambda _: None, False)
t.period = h
sp.registerTimedEvent(t)

roscomm = RosComm("ros_comm")
roscomm.setPeriod(0.1)

del t

Subscribe to and print the /pose topic#

This is to demonstrate the PoseStamped messages being published by the sim

def printPosition(pose):
    """Print the position from the given pose.

    Parameters
    ----------
    pose :
        The pose whose position will be printed.
    """
    sec = pose.header.stamp.sec
    nsec = pose.header.stamp.nanosec
    t = sec + 1e-9 * nsec
    position = np.array([pose.pose.position.x, pose.pose.position.y, pose.pose.position.z])
    print(f"{t=:.5g} {position}")


pubsub.subscribe("/pose", PoseStamped, printPosition)
'e54bf411-fafb-48a8-8dd7-7a8fb8c7e830'

Run and Clean Up#

sp.advanceBy(1.0)
t=0.1 [6.21659043e-06 1.25010413e-05 1.25902848e-01]
t=0.2 [ 3.66838531e-05 -1.51171786e-04  1.49291143e-01]
t=0.3 [ 2.15317170e-05 -1.70409927e-04  1.28074954e-01]
t=0.4 [-9.26186880e-05 -2.23504113e-04  1.46480777e-01]
t=0.5 [-8.03233143e-05 -2.02847167e-04  1.30706999e-01]
t=0.6 [-9.61293833e-05 -1.74686984e-04  1.43464872e-01]
t=0.7 [ 4.44683424e-05 -1.67816961e-04  1.33616218e-01]
t=0.8 [ 0.00019843 -0.00019242  0.14036566]
t=0.9 [ 0.00042372 -0.00015653  0.1363341 ]
<SpStatusEnum.REACHED_END_TIME: 1>
 Nodes
 ---

Publish a new driving command#

The RosComm model will receive this message and update the driving mode upon the next simulation step

!ros2 topic pub --once /command std_msgs/msg/String "data: forward"
publisher: beginning loop
publishing #1: std_msgs.msg.String(data='forward')
sp.advanceBy(5.0)
t=1 [ 0.00067868 -0.0001608   0.13759262]
t=1.1 [ 1.00348693e-03 -1.24044077e-04  1.38446879e-01]
t=1.2 [ 3.16128932e-03 -1.01202302e-04  1.35636614e-01]
t=1.3 [ 0.00903308 -0.00030654  0.13982312]
t=1.4 [1.93564319e-02 4.94279856e-05 1.34550902e-01]
t=1.5 [0.03382264 0.00055203 0.14015249]
t=1.6 [0.05241513 0.00080046 0.13459761]
t=1.7 [0.07501351 0.00070825 0.13975692]
t=1.8 [0.10148733 0.00051264 0.13509404]
t=1.9 [0.13229783 0.000572   0.13908449]
t=2 [0.16693539 0.00064599 0.135986  ]
t=2.1 [0.20597232 0.00104475 0.13807206]
t=2.2 [0.24913292 0.00162681 0.1368598 ]
t=2.3 [0.29629743 0.00172614 0.13741043]
t=2.4 [0.34736463 0.00228526 0.13741304]
t=2.5 [0.40265159 0.00224405 0.13680302]
t=2.6 [0.46176137 0.0021196  0.13793199]
t=2.7 [0.52509566 0.0016836  0.1364957 ]
t=2.8 [0.59257062 0.0012306  0.13799747]
t=2.9 [0.664086   0.00084548 0.13651509]
t=3 [7.39702558e-01 3.07849564e-04 1.37969850e-01]
t=3.1 [ 8.19332345e-01 -2.34610044e-05  1.36587207e-01]
t=3.2 [ 9.03141931e-01 -4.34932976e-04  1.37760275e-01]
t=3.3 [ 9.90866895e-01 -8.30675657e-04  1.36915116e-01]
t=3.4 [ 1.08268293 -0.00116761  0.137431  ]
t=3.5 [ 1.17872395 -0.00142385  0.1371321 ]
t=3.6 [ 1.2785882  -0.00161248  0.13729714]
t=3.7 [ 1.38246175 -0.00174713  0.13729834]
t=3.8 [ 1.49054171 -0.00172436  0.13708652]
t=3.9 [ 1.60271744 -0.00207118  0.13745296]
t=4 [ 1.71897401 -0.00199916  0.13704639]
t=4.1 [ 1.83928576e+00 -1.80944815e-03  1.37413413e-01]
t=4.2 [ 1.96367247e+00 -1.56610367e-03  1.37045459e-01]
t=4.3 [ 2.09209076e+00 -1.27890947e-03  1.37461748e-01]
t=4.4 [ 2.22446623e+00 -1.19796977e-03  1.37038496e-01]
t=4.5 [ 2.36122349e+00 -5.97530773e-04  1.37379644e-01]
t=4.6 [ 2.50197298e+00 -1.15804405e-07  1.37157541e-01]
t=4.7 [2.64684918e+00 6.09325254e-04 1.37288237e-01]
t=4.8 [2.79585573e+00 1.20567381e-03 1.37193755e-01]
t=4.9 [2.94881131e+00 1.54930840e-03 1.37247962e-01]
t=5 [3.10592827e+00 2.12318854e-03 1.37283282e-01]
t=5.1 [3.26717218e+00 2.59836175e-03 1.37151740e-01]
t=5.2 [3.43250499e+00 3.11107483e-03 1.37314879e-01]
t=5.3 [3.60190120e+00 3.57585951e-03 1.37182003e-01]
t=5.4 [3.77533707 0.00404745 0.13728283]
t=5.5 [3.95282637 0.00426584 0.13717628]
t=5.6 [4.13439729 0.00443055 0.13729539]
t=5.7 [4.32013097 0.00495711 0.13718959]
t=5.8 [4.50995117 0.00520905 0.13724915]
t=5.9 [4.70371602 0.0050414  0.13723777]
<SpStatusEnum.REACHED_END_TIME: 1>
# Cleanup
def cleanup():
    """Cleanup the simulation."""
    global left_front_wheel, left_front_wheel_node, left_rear_wheel, left_rear_wheel_node, right_rear_wheel, right_rear_wheel_node, right_front_wheel, right_front_wheel_node, proxy_scene, mb_info, integ, web_scene
    del (
        left_front_wheel,
        left_front_wheel_node,
        left_rear_wheel,
        left_rear_wheel_node,
        right_rear_wheel,
        right_rear_wheel_node,
        right_front_wheel,
        right_front_wheel_node,
        proxy_scene,
        web_scene,
        mb_info,
        integ,
    )

    discard(sp)
    cleanup_graphics()

    discard(mb)
    discard(fc)


atexit.register(cleanup)
<function __main__.cleanup()>

Summary#

You now understand how to drive a ATRVjr model by applying torque forces in the PreDeriv call.