ATRVjr ROS Integration#
In this notebook we import the ATRVjr rover model into kdflex and interface with ROS. We will publish messages to ROS as the sim advances as well as listen for a ROS message to command the vehicle’s driving mode.
Requirements:
In this tutorial we will:
Import and modify the URDF
Setup the kdFlex scene
Attach wheel geometries
Setup StatePropagator and models
Setup the Simulation
Integrate with ROS
Run and clean up
Acknowledgements
The robot URDF model included in this project is Copyright (c) 2018 diasdm and licensed under the BSD 2‑Clause License (link). Some modifications were made to adapt the model for use within the kdflex simulation framework, including simplifying wheel geometries due to current rendering and collision handling limitations.
For a more in-depth descriptions of kdflex concepts see usage.
First let’s confirm that ROS is installed and properly sourced
try:
import rclpy
except ModuleNotFoundError:
raise ModuleNotFoundError(
"rclpy package is not found!\n"
"Please ensure ROS2 is installed and that you've sourced the appropriate setup script.\n"
" https://docs.ros.org/en/kilted/Installation.html"
)
import atexit
import numpy as np
from math import pi
from pathlib import Path
from Karana.Core import discard
from Karana.Frame import FrameContainer
from Karana.Dynamics.SOADyn_types import MultibodyDS
from Karana.Math import IntegratorType
from Karana.Dynamics import (
Multibody,
PhysicalBody,
PhysicalBody,
HingeType,
PhysicalHinge,
PinSubhinge,
StatePropagator,
TimedEvent,
)
from Karana.Dynamics.SOADyn_types import (
HingeDS,
PinSubhingeDS,
SubhingeDS,
Linear3SubhingeDS,
SphericalSubhingeDS,
BodyWithContextDS,
ScenePartSpecDS,
)
from Karana.Scene import (
AssimpImporter,
SphereGeometry,
BoxGeometry,
CylinderGeometry,
StaticMeshGeometry,
AbstractStaticGeometry,
Color,
PhysicalMaterialInfo,
PhysicalMaterial,
LAYER_COLLISION,
LAYER_GRAPHICS,
LAYER_ALL,
)
from Karana.Scene import CoalScene
from Karana.Scene import ProxyScene, ProxySceneNode, ProxyScenePart
from Karana.Math import UnitQuaternion, SpatialInertia, HomTran
from Karana.Models import UniformGravity, UpdateProxyScene, SyncRealTime, PenaltyContact
from Karana.Collision import FrameCollider, HuntCrossley
Build the filepath to the URDF file we will import
resources = Path().absolute().parent / "resources"
urdf_file = resources / "atrvjr" / "atrvjr.urdf"
Import and Modify the URDF#
Import the URDF file to create a SOADyn_types.MultibodyDS. The “DS” is short for “Data Structure”. This is kdFlex’s native, file-format agnostic way of specifying a multibody. We’ll use this later on to instantiate the multibody.
It’s easier to edit the model while it’s in the MultibodyDS format before we convert it to a Multibody. In the cell below we the original hinge between the rover and the root frame to a full 6-DOF hinge, which allows it to move freely. We then convert the hinges between the wheel bodies to pin joints so the wheels can properly rotate.
Last we set all the visuals in the MultibodyDS to belong to the graphics layer only so that it can’t interact with the wheels which we add later or the ground.
mb_info = MultibodyDS.fromUrdf(urdf_file)
mb_info.model_dump()
# convert to full6DOF hinge bewteen root frame and rover.
mb_info.base_bodies[0].body.hinge = HingeDS(
hinge_type=HingeType.FULL6DOF,
subhinges=[
Linear3SubhingeDS(prescribed=False),
SphericalSubhingeDS(prescribed=False),
],
)
# unlock locked Hinges for wheels from the URDF
for i in [0, 1]:
mb_info.base_bodies[0].children[i].body.hinge = HingeDS(
hinge_type=HingeType.PIN,
subhinges=[PinSubhingeDS(prescribed=False, unit_axis=np.array([0.0, 0.0, 1.0]))],
)
for i in [2, 3]:
mb_info.base_bodies[0].children[i].body.hinge = HingeDS(
hinge_type=HingeType.PIN,
subhinges=[PinSubhingeDS(prescribed=False, unit_axis=np.array([0.0, 0.0, -1.0]))],
)
# Remove all collision meshes. We want to add these manually.
def removeCollision():
"""Remove all the collision meshes."""
def _removeParts(parts: list[ScenePartSpecDS]):
idx = []
for k, p in enumerate(parts):
if p.layers == LAYER_COLLISION:
idx.append(k)
idx.reverse()
for k in idx:
parts.pop(k)
def _recurse(b: BodyWithContextDS):
_removeParts(b.body.scene_parts)
for c in b.children:
_recurse(c)
for base in mb_info.base_bodies:
_recurse(base)
removeCollision()
Create a Karana.Frame.FrameContainer to manager the frames layer.
fc = FrameContainer("root")
Instantiate and configure the Karana.Dynamics.Multibody based on the MultibodyDS description.
mb = mb_info.toMultibody(fc)
Finalize the model to get it ready for use.
mb.ensureCurrent()
mb.resetData()
mb.dumpTree()
|-atrvjr_MBVROOT_
|-[FULL6DOF] center_link
|-[PIN] left_front_wheel_link
|-[PIN] left_rear_wheel_link
|-[PIN] right_front_wheel_link
|-[PIN] right_rear_wheel_link
|-[LOCKED] cbimu_frame
|-[LOCKED] mpuimu_frame
|-[LOCKED] arm_base_frame
|-[LOCKED] lrf_frame
|-[LOCKED] novatel
Setup the kdFlex Scene#
Next we setup kdflex’s visualizer by creating a graphics scene and a webserver to view it. We use the helper method setupGraphics() to take care of setting up the scene and webserver for us.
See Visualization and SceneLayer for more information relating to this section.
cleanup_graphics, web_scene = mb.setupGraphics(port=0)
web_scene.defaultCamera().pointCameraAt(
offset=np.array([[0.0], [5.0], [10.0]]),
target=np.array([[0.0], [0.0], [0.0]]),
up=np.array([[0.0], [0.0], [1.0]]),
)
proxy_scene = mb.getScene()
[WebUI] Listening at http://newton:39223
# attach a collision scene
col_scene = CoalScene("collision_scene")
proxy_scene.registerClientScene(col_scene, mb.virtualRoot(), layers=LAYER_COLLISION)
Display the bodies in the multibody model.
mb.displayModel()
Body Parent Hinge Dofs
____ ______ _____ ____
1. center_link atrvjr_MBVROOT_ FULL6DOF 6/0
2. left_front_wheel_link center_link PIN 1/6
3. left_rear_wheel_link center_link PIN 1/7
4. right_front_wheel_link center_link PIN 1/8
5. right_rear_wheel_link center_link PIN 1/9
6. cbimu_frame center_link LOCKED 0/10
7. mpuimu_frame center_link LOCKED 0/10
8. arm_base_frame center_link LOCKED 0/10
9. lrf_frame center_link LOCKED 0/10
10. novatel center_link LOCKED 0/10
____
10
Attach Wheel Geometries#
wheel_geom = CylinderGeometry(0.125, 0.075)
# create a ground for the rover to drive on
ground_geom = BoxGeometry(30, 30, 0.25)
# # Importing the actual wheel mesh. Using sphere's instead though since these are wonky for collision dynamics
# wheel_file = "/home/griffith/workspace/MySandbox/src/Tutorials/test/resources/atrvjr/meshes/wheel.obj"
# importer = AssimpImporter()
# wheel_obj = importer.importFrom(wheel_file)
left_front_wheel = mb.getBody("left_front_wheel_link")
# left_front_wheel.addScenePartSpec(wheel_obj.parts[0])
# left_front_wheel.addScenePartSpec(wheel_obj.parts[1])
left_front_wheel_node = ProxyScenePart(
name="left_front_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
left_front_wheel_node.attachTo(left_front_wheel)
left_front_wheel_node.setTranslation([0, 0, 0.03])
left_front_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# left_front_wheel_node.graphics().showAxes(1.0)
left_rear_wheel = mb.getBody("left_rear_wheel_link")
# left_rear_wheel.addScenePartSpec(wheel_obj.parts[0])
# left_rear_wheel.addScenePartSpec(wheel_obj.parts[1])
left_rear_wheel_node = ProxyScenePart(
"left_rear_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
left_rear_wheel_node.attachTo(left_rear_wheel)
left_rear_wheel_node.setTranslation([0, 0, 0.03])
left_rear_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# left_rear_wheel_node.graphics().showAxes(1.0)
right_front_wheel = mb.getBody("right_front_wheel_link")
# right_front_wheel.addScenePartSpec(wheel_obj.parts[0])
# right_front_wheel.addScenePartSpec(wheel_obj.parts[1])
right_front_wheel_node = ProxyScenePart(
"right_front_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
right_front_wheel_node.attachTo(right_front_wheel)
right_front_wheel_node.setTranslation([0, 0, 0.03])
right_front_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# right_front_wheel_node.graphics().showAxes(1.0)
right_rear_wheel = mb.getBody("right_rear_wheel_link")
# right_rear_wheel.addScenePartSpec(wheel_obj.parts[0])
# right_rear_wheel.addScenePartSpec(wheel_obj.parts[1])
right_rear_wheel_node = ProxyScenePart(
"right_rear_wheel_link_node", scene=proxy_scene, geometry=wheel_geom
)
right_rear_wheel_node.attachTo(right_rear_wheel)
right_rear_wheel_node.setTranslation([0, 0, 0.03])
right_rear_wheel_node.setUnitQuaternion(UnitQuaternion(pi / 2, [1.0, 0.0, 0.0]))
# right_rear_wheel_node.graphics().showAxes(1.0)
mat_info = PhysicalMaterialInfo()
mat_info.color = Color.WHITESMOKE
gray = PhysicalMaterial(mat_info)
# # exporter currently struggles to export the full multibody
# multibodyDS = mb.toDS()
# multibodyDS.toFile("atrvjr.json")
ground = ProxyScenePart(
"ground", scene=proxy_scene, geometry=ground_geom, material=gray, layers=LAYER_ALL
)
ground.setTranslation([0, 0, -0.1])
del ground, mat_info, wheel_geom, ground_geom, gray
Setup StatePropagator and models#
# set up state propagator
sp = StatePropagator(mb, IntegratorType.RK4)
integ = sp.getIntegrator()
# Create a UniformGravity model, and set it's gravitational acceleration.
ug = UniformGravity("grav_model", sp, mb)
ug.params.g = np.array([0, 0, -9.81])
del ug
# Makes sure the visualization scene is updated after each state change.
UpdateProxyScene("update_proxy_scene", sp, proxy_scene)
# sync the simulation time with real-time.
SyncRealTime("sync_real_time", sp, 1.0)
# add penalty contact model and register its bodies and parameters
hc = HuntCrossley("hunt_crossley_contact")
hc.params.kp = 100000
hc.params.kc = 20000
hc.params.mu = 0.3
hc.params.n = 1.5
hc.params.linear_region_tol = 1e-3
PenaltyContact("penalty_contact", sp, mb, [FrameCollider(proxy_scene, col_scene)], hc)
del col_scene, hc
Because forces are cleared out in between simulation steps, we create a callback function to apply forces depending on the intended direction. We attach it as a Karana.Dynamics.spFunctions.pre_deriv_fns, since it is called before the multibody derivative which ensures the forces are used in the state update calculations.
direction = "coast"
def pre_deriv_fn(t, x):
"""Set torques based on the driving direction.
Parameters
----------
t : float
The current time.
x : NDArray[np.float64]
The current state.
"""
# manually set torques in pre_deriv function
global direction
match direction:
case "coast":
right_rear_wheel.parentHinge().subhinge(0).setT([0])
left_rear_wheel.parentHinge().subhinge(0).setT([0])
right_front_wheel.parentHinge().subhinge(0).setT([0])
left_front_wheel.parentHinge().subhinge(0).setT([0])
case "forward":
right_rear_wheel.parentHinge().subhinge(0).setT([4])
left_rear_wheel.parentHinge().subhinge(0).setT([4])
right_front_wheel.parentHinge().subhinge(0).setT([4])
left_front_wheel.parentHinge().subhinge(0).setT([4])
case "backward":
right_rear_wheel.parentHinge().subhinge(0).setT([-4])
left_rear_wheel.parentHinge().subhinge(0).setT([-4])
right_front_wheel.parentHinge().subhinge(0).setT([-4])
left_front_wheel.parentHinge().subhinge(0).setT([-4])
case "rotate_right":
right_rear_wheel.parentHinge().subhinge(0).setT([-5])
left_rear_wheel.parentHinge().subhinge(0).setT([5])
right_front_wheel.parentHinge().subhinge(0).setT([-5])
left_front_wheel.parentHinge().subhinge(0).setT([5])
case "rotate_left":
right_rear_wheel.parentHinge().subhinge(0).setT([5])
left_rear_wheel.parentHinge().subhinge(0).setT([-5])
right_front_wheel.parentHinge().subhinge(0).setT([5])
left_front_wheel.parentHinge().subhinge(0).setT([-5])
sp.fns.pre_deriv_fns["apply_torque"] = pre_deriv_fn
Setup the ROS interface#
EasyPubSub is a wrapper for a ROS node and executor which:
Simplifies publishing and subscribing to messages
Simplifies integration with the
StatePropagatorusing aKModel
from Karana.KUtils.ros import EasyPubSub
pubsub = EasyPubSub("sim")
Create a KModel to communicate with ROS#
This model will periodically check for
from Karana.Models import KModel
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import String
from builtin_interfaces.msg import Time
class RosComm(KModel):
"""ROS communication KModel."""
def __init__(self, name: str):
"""Create a RosComm instance.
Parameters
----------
name : str
The name of the RosComm KModel.
"""
super().__init__(name, sp)
sp.registerModel(self)
self.pubsub = pubsub
pubsub.subscribe("/command", String, self._handleCommand)
def _handleCommand(self, cmd: String):
if cmd.data not in ["coast", "forward", "backward", "rotate_right", "rotate_left"]:
return
global direction
direction = cmd.data
def _buildPoseStamped(self, t):
ns = int(t.astype("timedelta64[ns]").astype(int))
sec = ns // 1_000_000_000
nsec = ns % 1_000_000_000
root = mb.virtualRoot()
chassis = mb.getBody("center_link")
r2c = root.frameToFrame(chassis).relTransform()
r2c_qvec = r2c.getUnitQuaternion().toVector4()
msg = PoseStamped()
msg.header.stamp = Time(sec=sec, nanosec=nsec)
msg.header.frame_id = chassis.name()
msg.pose.position.x = r2c.getTranslation()[0]
msg.pose.position.y = r2c.getTranslation()[1]
msg.pose.position.z = r2c.getTranslation()[2]
msg.pose.orientation.x = r2c_qvec[0]
msg.pose.orientation.y = r2c_qvec[1]
msg.pose.orientation.z = r2c_qvec[2]
msg.pose.orientation.w = r2c_qvec[3]
return msg
def postModelStep(self, t, x):
"""Process ROS subscriptions and publish the pose message.
Parameters
----------
t : float
The current time.
x : NDArray[np.float64]
The current state.
"""
pubsub.processSubscriptions()
pose = self._buildPoseStamped(t)
self.pubsub.publish("/pose", pose)
Setup the Simulation#
Finish by setting the initial states and timed events
# set integrator state
t_init = np.timedelta64(0, "ns")
x_init = sp.assembleState()
sp.setTime(t_init)
sp.setState(x_init)
# syncs up the graphics
proxy_scene.update()
# register timed event
h = np.timedelta64(int(1e7), "ns")
t = TimedEvent("hop_size", h, lambda _: None, False)
t.period = h
sp.registerTimedEvent(t)
roscomm = RosComm("ros_comm")
roscomm.setPeriod(0.1)
del t
Subscribe to and print the /pose topic#
This is to demonstrate the PoseStamped messages being published by the sim
def printPosition(pose):
"""Print the position from the given pose.
Parameters
----------
pose :
The pose whose position will be printed.
"""
sec = pose.header.stamp.sec
nsec = pose.header.stamp.nanosec
t = sec + 1e-9 * nsec
position = np.array([pose.pose.position.x, pose.pose.position.y, pose.pose.position.z])
print(f"{t=:.5g} {position}")
pubsub.subscribe("/pose", PoseStamped, printPosition)
'e54bf411-fafb-48a8-8dd7-7a8fb8c7e830'
Run and Clean Up#
sp.advanceBy(1.0)
t=0.1 [6.21659043e-06 1.25010413e-05 1.25902848e-01]
t=0.2 [ 3.66838531e-05 -1.51171786e-04 1.49291143e-01]
t=0.3 [ 2.15317170e-05 -1.70409927e-04 1.28074954e-01]
t=0.4 [-9.26186880e-05 -2.23504113e-04 1.46480777e-01]
t=0.5 [-8.03233143e-05 -2.02847167e-04 1.30706999e-01]
t=0.6 [-9.61293833e-05 -1.74686984e-04 1.43464872e-01]
t=0.7 [ 4.44683424e-05 -1.67816961e-04 1.33616218e-01]
t=0.8 [ 0.00019843 -0.00019242 0.14036566]
t=0.9 [ 0.00042372 -0.00015653 0.1363341 ]
<SpStatusEnum.REACHED_END_TIME: 1>
Nodes
---
Publish a new driving command#
The RosComm model will receive this message and update the driving mode upon the next simulation step
!ros2 topic pub --once /command std_msgs/msg/String "data: forward"
publisher: beginning loop
publishing #1: std_msgs.msg.String(data='forward')
sp.advanceBy(5.0)
t=1 [ 0.00067868 -0.0001608 0.13759262]
t=1.1 [ 1.00348693e-03 -1.24044077e-04 1.38446879e-01]
t=1.2 [ 3.16128932e-03 -1.01202302e-04 1.35636614e-01]
t=1.3 [ 0.00903308 -0.00030654 0.13982312]
t=1.4 [1.93564319e-02 4.94279856e-05 1.34550902e-01]
t=1.5 [0.03382264 0.00055203 0.14015249]
t=1.6 [0.05241513 0.00080046 0.13459761]
t=1.7 [0.07501351 0.00070825 0.13975692]
t=1.8 [0.10148733 0.00051264 0.13509404]
t=1.9 [0.13229783 0.000572 0.13908449]
t=2 [0.16693539 0.00064599 0.135986 ]
t=2.1 [0.20597232 0.00104475 0.13807206]
t=2.2 [0.24913292 0.00162681 0.1368598 ]
t=2.3 [0.29629743 0.00172614 0.13741043]
t=2.4 [0.34736463 0.00228526 0.13741304]
t=2.5 [0.40265159 0.00224405 0.13680302]
t=2.6 [0.46176137 0.0021196 0.13793199]
t=2.7 [0.52509566 0.0016836 0.1364957 ]
t=2.8 [0.59257062 0.0012306 0.13799747]
t=2.9 [0.664086 0.00084548 0.13651509]
t=3 [7.39702558e-01 3.07849564e-04 1.37969850e-01]
t=3.1 [ 8.19332345e-01 -2.34610044e-05 1.36587207e-01]
t=3.2 [ 9.03141931e-01 -4.34932976e-04 1.37760275e-01]
t=3.3 [ 9.90866895e-01 -8.30675657e-04 1.36915116e-01]
t=3.4 [ 1.08268293 -0.00116761 0.137431 ]
t=3.5 [ 1.17872395 -0.00142385 0.1371321 ]
t=3.6 [ 1.2785882 -0.00161248 0.13729714]
t=3.7 [ 1.38246175 -0.00174713 0.13729834]
t=3.8 [ 1.49054171 -0.00172436 0.13708652]
t=3.9 [ 1.60271744 -0.00207118 0.13745296]
t=4 [ 1.71897401 -0.00199916 0.13704639]
t=4.1 [ 1.83928576e+00 -1.80944815e-03 1.37413413e-01]
t=4.2 [ 1.96367247e+00 -1.56610367e-03 1.37045459e-01]
t=4.3 [ 2.09209076e+00 -1.27890947e-03 1.37461748e-01]
t=4.4 [ 2.22446623e+00 -1.19796977e-03 1.37038496e-01]
t=4.5 [ 2.36122349e+00 -5.97530773e-04 1.37379644e-01]
t=4.6 [ 2.50197298e+00 -1.15804405e-07 1.37157541e-01]
t=4.7 [2.64684918e+00 6.09325254e-04 1.37288237e-01]
t=4.8 [2.79585573e+00 1.20567381e-03 1.37193755e-01]
t=4.9 [2.94881131e+00 1.54930840e-03 1.37247962e-01]
t=5 [3.10592827e+00 2.12318854e-03 1.37283282e-01]
t=5.1 [3.26717218e+00 2.59836175e-03 1.37151740e-01]
t=5.2 [3.43250499e+00 3.11107483e-03 1.37314879e-01]
t=5.3 [3.60190120e+00 3.57585951e-03 1.37182003e-01]
t=5.4 [3.77533707 0.00404745 0.13728283]
t=5.5 [3.95282637 0.00426584 0.13717628]
t=5.6 [4.13439729 0.00443055 0.13729539]
t=5.7 [4.32013097 0.00495711 0.13718959]
t=5.8 [4.50995117 0.00520905 0.13724915]
t=5.9 [4.70371602 0.0050414 0.13723777]
<SpStatusEnum.REACHED_END_TIME: 1>
# Cleanup
def cleanup():
"""Cleanup the simulation."""
global left_front_wheel, left_front_wheel_node, left_rear_wheel, left_rear_wheel_node, right_rear_wheel, right_rear_wheel_node, right_front_wheel, right_front_wheel_node, proxy_scene, mb_info, integ, web_scene
del (
left_front_wheel,
left_front_wheel_node,
left_rear_wheel,
left_rear_wheel_node,
right_rear_wheel,
right_rear_wheel_node,
right_front_wheel,
right_front_wheel_node,
proxy_scene,
web_scene,
mb_info,
integ,
)
discard(sp)
cleanup_graphics()
discard(mb)
discard(fc)
atexit.register(cleanup)
<function __main__.cleanup()>
Summary#
You now understand how to drive a ATRVjr model by applying torque forces in the PreDeriv call.