Data Logging#
This notebook explores kdflex’s built in data logging capabilities using a 2-link pendulum example multibody. We start with generic non-simulation data logging to H5 files using Karana.KUtils.H5Writer. Then we will demonstrate automatically logging sim data using the Karana.Models.DataLogger. This requires going through the typical process of setting up a multibody and running the simulation.
Requirements:
In this tutorial we will:
Setup the PacketTables
Log using H5Writer
Read the H5 file using H5py
Setup a 2-link pendulum
Attach a DataLogger model
Run the simulation
Read logging output
Clean up the simulation
For a more in-depth descriptions of kdflex concepts see usage.
import numpy as np
import atexit
from typing import cast
from math import pi
from Karana.Core import discard, allFinalized
from Karana.Frame import FrameContainer
from Karana.Math import IntegratorType
from Karana.Dynamics import (
Multibody,
PhysicalBody,
PhysicalBody,
HINGE_TYPE,
StatePropagator,
TimedEvent,
PhysicalBodyParams,
)
from Karana.Math import SpatialInertia, HomTran
from Karana.Models import UniformGravity, UpdateProxyScene, SyncRealTime
from Karana.Scene import (
BoxGeometry,
CylinderGeometry,
Color,
PhysicalMaterialInfo,
PhysicalMaterial,
ScenePartSpec,
)
from Karana.Models import DataLogger
from Karana.KUtils import H5Writer, PacketTableConfig
Generic Non-Simulation Data Logging#
We don’t necessarily need to be running a simulation to use kdflex’s logging capabilities. The following cells demonstrate how to use kdflex’s data logging capabilities without running a simulation. See later in the notebook for how to log data from a simulation.
Setup the PacketTables#
PacketTables are the tables that store data in the H5 log file.We use Karana.KUtils.PacketTableConfig to define the columns and associated update functions for a PacketTable.
# Here we create two packet tables configs which we will use to create the tables to store data in
# We use two tables here. This tests whether we can use a group that already exists, i.e., mygroup will
# already be created by c and should be re-used by c2.
c_gen = PacketTableConfig("mygroup/inner_group/data")
c2_gen = PacketTableConfig("mygroup/data")
For our case we next create functions for tracking example values. Each time these functions are called the data updates, simulating what our values might look like in a simulation. This is just to simulate what some sort of value we are interested in logging will look like. These functions are then added to the packet table configs.
# float
t = 0.0
def time():
global t
t += 0.1
return t
# adding float data to be recorded in the table
c_gen.addDataFloat("time", time)
# integer
i = 1
def int_f() -> int:
global i
i += 1
return i
# adding integer data to be recorded in the table
c_gen.addDataInt("int", int_f)
# dynamic vector
v = np.zeros(2)
def vec():
global v
v[0] += 1.0
v[1] += 2.0
return v
# adding a dynamic vector to be recorded in the table
c_gen.addData("vec", vec, 2, as_scalars=True)
# dynamic matrix
m = np.zeros((3, 2))
def mat():
global m
m[0, 0] += 1.0
return m
# adding a dynamic matrix to be recorded in both table configs
c_gen.addData("mat", mat, 3, 2)
c2_gen.addData("mat", mat, 3, 2)
Log Using H5Writer#
We now create the Karana.KUtils.H5Writer and specify a file to output data to. We create the packet tables from the packet table config and then log the data by calling Karana.KUtils.H5Writer.log().
# create logger and tables
h5 = H5Writer("example.h5")
h5.createTable(c_gen)
h5.createTable(c2_gen)
# log data three times
h5.log()
h5.log()
h5.log()
Below we demonstrate table naming and how to deactivate an active table. Logging will only add data to active tables.
# check active table names
print(f"Active table names: {", ".join(h5.activeTableNames())}")
# deactivate table
h5.deactivateTable("mygroup/inner_group/data")
# check active table names
print(f"Active table names: {", ".join(h5.activeTableNames())}")
# log only active tables
h5.log()
# logging an inactive table via logTable
h5.deactivateTable("mygroup/data")
h5.logTable("mygroup/data")
# activate a new table
h5.activateTable("mygroup/data")
h5.log()
Active table names: mygroup/inner_group/data, mygroup/data
Active table names: mygroup/data
Read the H5 File Using h5py#
Let’s view the data we just logged. In order to do this we need to delete the H5 writer so that it releases its lock on the HDF5 file. We also need to delete the corresponding PacketTableConfig in order to properly delete the H5 writer.
We then specify the output file we want to read, and print out the keys for the data.
import h5py
del c_gen, c2_gen, h5
# load the file
f = h5py.File("example.h5", "r")
f.keys()
<KeysViewHDF5 ['mygroup']>
The HDF5 is structured in data groups which we specified earlier. The following cells depict how to parse and unpack this data.
# Data can be accessed like a dictionary using keys.
# The following print statements illustrate the
# structure of the hdf5 file and how to access the data
# Show keys for mygroup
print(f["mygroup"].keys())
print(f["mygroup"]["data"].dtype)
matrix_data = f["mygroup"]["data"]["mat"][:]
# Show keys for inner_group
print(f["mygroup"]["inner_group"].keys())
dtype = f["mygroup"]["inner_group"]["data"].dtype
print(f"Data types logged in the inner group: {dtype}")
<KeysViewHDF5 ['data', 'inner_group']>
[('mat', '<f8', (3, 2))]
<KeysViewHDF5 ['data']>
Data types logged in the inner group: [('time', '<f8'), ('int', '<i4'), ('vec_0', '<f8'), ('vec_1', '<f8'), ('mat', '<f8', (3, 2))]
# Print out logged data
time_data = f["mygroup"]["inner_group"]["data"]["time"][:]
integer_data = f["mygroup"]["inner_group"]["data"]["int"][:]
vec_0_data = f["mygroup"]["inner_group"]["data"]["vec_0"][:]
vec_1_data = f["mygroup"]["inner_group"]["data"]["vec_1"][:]
mat_data = f["mygroup"]["data"]["mat"][:]
print(f"Time data: {time_data}")
print(f"Integer data: {integer_data}")
print(f"Vector element 0 data: {vec_0_data}")
print(f"Vector element 1 data: {vec_1_data}")
print(f"Matrix data: {mat_data}")
matrix_data_2 = f["mygroup"]["inner_group"]["data"]["mat"][:]
Time data: [0.1 0.2 0.3]
Integer data: [2 3 4]
Vector element 0 data: [1. 2. 3.]
Vector element 1 data: [2. 4. 6.]
Matrix data: [[[2. 0.]
[0. 0.]
[0. 0.]]
[[4. 0.]
[0. 0.]
[0. 0.]]
[[6. 0.]
[0. 0.]
[0. 0.]]
[[7. 0.]
[0. 0.]
[0. 0.]]
[[8. 0.]
[0. 0.]
[0. 0.]]
[[9. 0.]
[0. 0.]
[0. 0.]]]
Simulation Data Logging#
Now we demonstrate automatically logging sim data using a DataLogger model. To do so, we setup a simulation, configure our writer, attach it to our state propagator, and run the simulation.
Setup a 2-link Pendulum#
We procedurally generate a 2-link pendulum multibody. It is a simpler way of creating the pendulum but has the same functionality as manual creation. You can read more about procedurally generating a multibody in our \(n\)-link pendulum tutorial
See Multibody or Frames for more information relating to this step.
fc = FrameContainer("root")
def createMbody(n_links: int):
"""Create the multibody.
Parameters
----------
n_links : int
The number of pendulum links to use.
"""
mb = Multibody("mb", fc)
mat_info = PhysicalMaterialInfo()
mat_info.color = Color.FIREBRICK
brown = PhysicalMaterial(mat_info)
sp_body = ScenePartSpec()
sp_body.name = "sp"
sp_body.geometry = BoxGeometry(0.05, 0.05, 1)
sp_body.material = brown
sp_body.transform = HomTran([0.0, 0.0, 0.0])
sp_body.scale = [1, 1, 1]
sp_pivot = ScenePartSpec()
sp_pivot.name = "sp"
sp_pivot.geometry = CylinderGeometry(0.1, 0.1)
sp_pivot.material = brown
sp_pivot.transform = HomTran([0.0, 0.0, -0.5])
sp_pivot.scale = [1, 1, 1]
params = PhysicalBodyParams(
spI=SpatialInertia(2.0, np.zeros(3), np.diag([3, 2, 1])),
axes=[np.array([0.0, 1.0, 0.0])],
body_to_joint_transform=HomTran(np.array([0, 0, 0.5])),
inb_to_joint_transform=HomTran(np.array([0, 0, -0.5])),
scene_part_specs=[sp_body, sp_pivot],
)
PhysicalBody.addSerialChain(
"link", n_links, cast(PhysicalBody, mb.virtualRoot()), htype=HINGE_TYPE.PIN, params=params
)
# finalize and verify multibody
mb.ensureCurrent()
mb.resetData()
assert allFinalized()
return mb
In this cell we create the multibody by calling the createMbody we defined in the previous cell. We also obtain the two pendulum bodies and assign them to variables
# initialization
n_links = 2
mb = createMbody(n_links)
Next we setup kdflex’s graphics by calling the setupGraphics helper method on the multibody. This method takes care of setting up the graphics environment.
See Visualization and SceneLayer for more information relating to this section.
cleanup_graphics, web_scene = mb.setupGraphics(port=0, axes=0.0)
# position the viewpoint camera of the visualization
web_scene.defaultCamera().pointCameraAt(
[0.0, 5.0 + n_links / 2, -n_links / 2], [0, 0, -n_links / 2], [0, 0, 1]
)
proxy_scene = mb.getScene()
[WebUI] Listening at http://newton:33623
Now, we setup the Karana.Dynamics.StatePropagator and register our models as well.
See Models for more concepts and information.
# Set up state propagator and select integrator type: rk4 or cvode
sp = StatePropagator(mb, integrator_type=IntegratorType.RK4)
integrator = sp.getIntegrator()
sp_opts = sp.getOptions()
# add a gravitational model to the state propagator
ug = UniformGravity("grav_model", sp, mb)
ug.params.g = np.array([0, 0, -9.81])
del ug
# Makes sure the visualization scene is updated after each state change.
UpdateProxyScene("update_proxy_scene", sp, proxy_scene)
# sync the simulation time with real-time.
SyncRealTime("sync_real_time", sp, 1.0)
def post_step_fn(t, x):
print(f"t = {float(integrator.getTime())/1e9}s; x = {integrator.getX()}")
sp_opts.fns.post_hop_fns["update_and_info"] = post_step_fn
Before we begin our simulation, we set the initial state for our multibody and statepropagator.
When accessing or modifying generalized coordinates for a subhinge, it is recommended to directly set the subhinge’s values rather than for the entire multibody in order to avoid ambiguity.
# Modify the initial multibody state.
# Here we will set the first pendulum position to pi/8 radians and its velocity to 0.0
bd1 = mb.getBody("link_0")
bd1.parentHinge().subhinge(0).setQ([pi / 8])
bd1.parentHinge().subhinge(0).setU([0.0])
# Initialize state. Note that in kdflex there are two states, one for multibody and one for integrator.
# Here we get the multibody state vector we updated above and then feed it to the integrator.
t_init = np.timedelta64(0, "ns")
x_init = mb.dynamicsToState()
sp.setTime(t_init)
sp.setState(x_init)
# sync graphics
proxy_scene.update()
To execute our callback functions every 0.1 seconds, we create a timed event and register it to our state propagator here. See timed events for more information.
h = np.timedelta64(int(1e8), "ns")
t = TimedEvent("hop_size", h, lambda _: None, False)
t.period = h
# register the timed event
sp.registerTimedEvent(t)
del t
Attach a DataLogger Model#
First, we setup our H5Writer instance and define the functions that we use to log data.
# create packet table configs and add data
# adding time
c1 = PacketTableConfig("time_1")
c1.addDataFloat("t", lambda: float(sp.getTime()) / 1e9)
c2 = PacketTableConfig("time_2")
c2.addDataFloat("t", lambda: float(sp.getTime()) / 1e9)
# adding state
c3 = PacketTableConfig("state")
c3.addData("state", lambda: np.array(integrator.getX()), n_links * 2)
# adding relative velocity between body frames
c4 = PacketTableConfig("rel_vel")
bd1 = mb.getBody("link_0")
bd2 = mb.getBody("link_1")
pendulum_end_to_end = bd1.frame2Frame(bd2)
c4.addData("rel_vel", lambda: np.array(pendulum_end_to_end.relSpVel().getv()), 3)
# create the H5 write and tables
h5_writer = H5Writer("log.h5")
h5_writer.createTable(c1)
h5_writer.createTable(c2)
h5_writer.createTable(c3)
h5_writer.createTable(c4)
Now we create the Karana.Models.DataLogger using the H5Writer and set its data logging frequency. We then verify the DataLogger and H5Writer is finalized and register the DataLogger to the state propagator.
dl = DataLogger("wsm", sp, h5_writer)
dl.params.log_first_step = True
dl.setPeriod(0.1)
print(f"Data Logger is Finalized: {dl.isFinalized()}")
Data Logger is Finalized: True
Run the Simulation#
# This should log both times for 0.0, 0.1, and 0.2
sp.advanceTo(0.2)
# This should log only time_1 for 0.3
h5_writer.deactivateTable("time_2")
sp.advanceTo(0.3)
# This should log nothing for time up to 1.0
sp.unregisterModel(dl)
sp.advanceTo(1.0)
# This should log only at 1.1, not 1.0
h5_writer.activateTable("time_2")
dl.params.log_first_step = False
sp.registerModel(dl)
sp.advanceTo(1.1)
t = 0.1s; x = [ 0.38085961 0.00907525 -0.23551005 0.17987599]
t = 0.2s; x = [ 0.34610386 0.03532941 -0.45583893 0.34039766]
t = 0.3s; x = [ 0.29068861 0.07587749 -0.64636777 0.46275047]
t = 0.4s; x = [ 0.21827047 0.1260333 -0.79389321 0.5300487 ]
t = 0.5s; x = [ 0.13370607 0.17961689 -0.8878367 0.52975609]
t = 0.6s; x = [ 0.04272369 0.22952563 -0.92153824 0.45633623]
t = 0.7s; x = [-0.04851943 0.26852565 -0.89317787 0.31280251]
t = 0.8s; x = [-0.13393852 0.29010315 -0.80599992 0.11031439]
t = 0.9s; x = [-0.2080092 0.28918536 -0.66784444 -0.13388806]
t = 1.0s; x = [-0.26618358 0.26261765 -0.49022784 -0.39909194]
t = 1.1s; x = [-0.30520321 0.20939939 -0.28723186 -0.66320243]
<SpStatusEnum.REACHED_END_TIME: 1>
Read Logging Output#
Let’s now view the results from the simulation. Make sure to delete the H5 writer and TablePacketConfigs to unlock the file.
del c1, c2, c3, c4, h5_writer, integrator, sp_opts, dl
discard(sp)
# load the file
file = h5py.File("log.h5", "r")
file.keys()
<KeysViewHDF5 ['rel_vel', 'state', 'time_1', 'time_2']>
# Output state data
state_data = file["state"][:]
print(f"State Vector: {state_data}")
# Output time data
time_data = file["time_1"][:]
print(f"Time1 Data: {time_data}")
# Output time data 2
time_data_2 = file["time_2"][:]
print(f"Time2 Data: {time_data_2}")
# Output relative velocity data
rel_vel_data = file["rel_vel"][:]
print(f"Relative Velocity Data: {rel_vel_data}")
State Vector: [([ 0.39269908, 0. , 0. , 0. ],)
([ 0.38085961, 0.00907525, -0.23551005, 0.17987599],)
([ 0.34610386, 0.03532941, -0.45583893, 0.34039766],)
([ 0.29068861, 0.07587749, -0.64636777, 0.46275047],)
([-0.30520321, 0.20939939, -0.28723186, -0.66320243],)]
Time1 Data: [(0. ,) (0.1,) (0.2,) (0.3,) (1.1,)]
Time2 Data: [(0. ,) (0.1,) (0.2,) (1.1,)]
Relative Velocity Data: [([ 0. , 0. , 0. ],)
([-0.08993429, 0. , 0.0008162 ],)
([-0.17009262, 0. , 0.00601177],)
([-0.23070949, 0. , 0.01753933],)
([ 0.3243577 , 0. , -0.06893075],)]
Clean Up the Simulation#
def cleanup():
global bd1, bd2, web_scene, proxy_scene
del bd1, bd2, web_scene, proxy_scene
cleanup_graphics()
discard(mb)
discard(fc)
atexit.register(cleanup)
<function __main__.cleanup()>
Summary#
You are now able to store both non-simulation and simulation data to a H5 file and read its contents back later! We covered setting up PacketTableConfigs and attaching logging functions, creating our H5Writer, and registering a DataLogger model.