Data Logging#

This notebook explores kdflex’s built in data logging capabilities using a 2-link pendulum example multibody. We start with generic non-simulation data logging to H5 files using Karana.KUtils.H5Writer. Then we will demonstrate automatically logging sim data using the Karana.Models.DataLogger. This requires going through the typical process of setting up a multibody and running the simulation.

Requirements:

In this tutorial we will:

  • Setup the PacketTables

  • Log using H5Writer

  • Read the H5 file using H5py

  • Setup a 2-link pendulum

  • Attach a DataLogger model

  • Run the simulation

  • Read logging output

  • Clean up the simulation

For a more in-depth descriptions of kdflex concepts see usage.

import numpy as np
import atexit
from typing import cast
from math import pi
from Karana.Core import discard, allReady
from Karana.Frame import FrameContainer
from Karana.Math import IntegratorType
from Karana.Dynamics import (
    Multibody,
    PhysicalBody,
    PhysicalBody,
    HingeType,
    StatePropagator,
    TimedEvent,
    PhysicalBodyParams,
)
from Karana.Math import SpatialInertia, HomTran
from Karana.Models import Gravity, UniformGravity, UpdateProxyScene, SyncRealTime, OutputUpdateType
from Karana.Scene import (
    BoxGeometry,
    CylinderGeometry,
    Color,
    PhysicalMaterialInfo,
    PhysicalMaterial,
    ScenePartSpec,
)
from Karana.Models import DataLogger
from Karana.KUtils import H5Writer, PacketTableConfig

Generic Non-Simulation Data Logging#

We don’t necessarily need to be running a simulation to use kdflex’s logging capabilities. The following cells demonstrate how to use kdflex’s data logging capabilities without running a simulation. See later in the notebook for how to log data from a simulation.

Setup the PacketTables#

PacketTables are the tables that store data in the H5 log file.We use Karana.KUtils.PacketTableConfig to define the columns and associated update functions for a PacketTable.

#  Here we create two packet tables configs which we will use to create the tables to store data in
#  We use two tables here. This tests whether we can use a group that already exists, i.e., mygroup will
# already be created by c and should be re-used by c2.
c_gen = PacketTableConfig("mygroup/inner_group/data")
c2_gen = PacketTableConfig("mygroup/data")

For our case we next create functions for tracking example values. Each time these functions are called the data updates, simulating what our values might look like in a simulation. This is just to simulate what some sort of value we are interested in logging will look like. These functions are then added to the packet table configs.

# float
t = 0.0


def time():
    """Return current time.

    Returns
    -------
    float
        The current time.
    """
    global t
    t += 0.1
    return t


# adding float data to be recorded in the table
c_gen.addDataFloat("time", time)

# integer
i = 1


def int_f() -> int:
    """Return an integer that increases by one each time this is called.

    Returns
    -------
    int
        The current value of the integer.
    """
    global i
    i += 1
    return i


# adding integer data to be recorded in the table
c_gen.addDataInt("int", int_f)

# dynamic vector
v = np.zeros(2)


def vec():
    """Return a vector.

    The first component increases by 1 and the second component by 2 each time this is called.

    Returns
    -------
    NDArray[np.float64]
        The current vector value.
    """
    global v
    v[0] += 1.0
    v[1] += 2.0
    return v


# adding a dynamic vector to be recorded in the table
c_gen.addData("vec", vec, 2, as_scalars=True)

# dynamic matrix
m = np.zeros((3, 2))


def mat():
    """Return a matrix.

    The first component increases by 1 each time this is called.

    Returns
    -------
    NDArray[np.float64]
        The current matrix value.
    """
    global m
    m[0, 0] += 1.0
    return m


# adding a dynamic matrix to be recorded in both table configs
c_gen.addData("mat", mat, 3, 2)
c2_gen.addData("mat", mat, 3, 2)

Log Using H5Writer#

We now create the Karana.KUtils.H5Writer and specify a file to output data to. We create the packet tables from the packet table config and then log the data by calling Karana.KUtils.H5Writer.log().

# create logger and tables
h5 = H5Writer("example.h5")
h5.createTable(c_gen)
h5.createTable(c2_gen)

# log data three times
h5.log()
h5.log()
h5.log()

Below we demonstrate table naming and how to deactivate an active table. Logging will only add data to active tables.

# check active table names
print(f"Active table names: {", ".join(h5.getActiveTableNames())}")

# deactivate table
h5.deactivateTable("mygroup/inner_group/data")

# check active table names
print(f"Active table names: {", ".join(h5.getActiveTableNames())}")

# log only active tables
h5.log()

# logging an inactive table via logTable
h5.deactivateTable("mygroup/data")
h5.logTable("mygroup/data")

# activate a new table
h5.activateTable("mygroup/data")
h5.log()
Active table names: mygroup/inner_group/data, mygroup/data
Active table names: mygroup/data

Read the H5 File Using h5py#

Let’s view the data we just logged. In order to do this we need to delete the H5 writer so that it releases its lock on the HDF5 file. We also need to delete the corresponding PacketTableConfig in order to properly delete the H5 writer.

We then specify the output file we want to read, and print out the keys for the data.

import h5py

del c_gen, c2_gen, h5

# load the file
f = h5py.File("example.h5", "r")
f.keys()
<KeysViewHDF5 ['mygroup']>

The HDF5 is structured in data groups which we specified earlier. The following cells depict how to parse and unpack this data.

# Data can be accessed like a dictionary using keys.
# The following print statements illustrate the
# structure of the hdf5 file and how to access the data

# Show keys for mygroup
print(f["mygroup"].keys())
print(f["mygroup"]["data"].dtype)

matrix_data = f["mygroup"]["data"]["mat"][:]

# Show keys for inner_group
print(f["mygroup"]["inner_group"].keys())
dtype = f["mygroup"]["inner_group"]["data"].dtype
print(f"Data types logged in the inner group: {dtype}")
<KeysViewHDF5 ['data', 'inner_group']>
[('mat', '<f8', (3, 2))]
<KeysViewHDF5 ['data']>
Data types logged in the inner group: [('time', '<f8'), ('int', '<i4'), ('vec_0', '<f8'), ('vec_1', '<f8'), ('mat', '<f8', (3, 2))]
# Print out logged data
time_data = f["mygroup"]["inner_group"]["data"]["time"][:]
integer_data = f["mygroup"]["inner_group"]["data"]["int"][:]
vec_0_data = f["mygroup"]["inner_group"]["data"]["vec_0"][:]
vec_1_data = f["mygroup"]["inner_group"]["data"]["vec_1"][:]
mat_data = f["mygroup"]["data"]["mat"][:]
print(f"Time data: {time_data}")
print(f"Integer data: {integer_data}")
print(f"Vector element 0 data: {vec_0_data}")
print(f"Vector element 1 data: {vec_1_data}")
print(f"Matrix data: {mat_data}")

matrix_data_2 = f["mygroup"]["inner_group"]["data"]["mat"][:]
Time data: [0.1 0.2 0.3]
Integer data: [2 3 4]
Vector element 0 data: [1. 2. 3.]
Vector element 1 data: [2. 4. 6.]
Matrix data: [[[2. 0.]
  [0. 0.]
  [0. 0.]]

 [[4. 0.]
  [0. 0.]
  [0. 0.]]

 [[6. 0.]
  [0. 0.]
  [0. 0.]]

 [[7. 0.]
  [0. 0.]
  [0. 0.]]

 [[8. 0.]
  [0. 0.]
  [0. 0.]]

 [[9. 0.]
  [0. 0.]
  [0. 0.]]]

Simulation Data Logging#

Now we demonstrate automatically logging sim data using a DataLogger model. To do so, we setup a simulation, configure our writer, attach it to our state propagator, and run the simulation.

Attach a DataLogger Model#

First, we setup our H5Writer instance and define the functions that we use to log data.

# create packet table configs and add data

# adding time
c1 = PacketTableConfig("time_1")
c1.addDataFloat("t", lambda: float(sp.getTime()) / 1e9)
c2 = PacketTableConfig("time_2")
c2.addDataFloat("t", lambda: float(sp.getTime()) / 1e9)

# adding state
c3 = PacketTableConfig("state")
c3.addData("state", lambda: np.array(integrator.getX()), n_links * 2)

# adding relative velocity between body frames
c4 = PacketTableConfig("rel_vel")
bd1 = mb.getBody("link_0")
bd2 = mb.getBody("link_1")
pendulum_end_to_end = bd1.frameToFrame(bd2)
c4.addData("rel_vel", lambda: np.array(pendulum_end_to_end.relSpVel().getv()), 3)

# create the H5 write and tables
h5_writer = H5Writer("log.h5")
h5_writer.createTable(c1)
h5_writer.createTable(c2)
h5_writer.createTable(c3)
h5_writer.createTable(c4)

Now we create the Karana.Models.DataLogger using the H5Writer and set its data logging frequency. We then verify the DataLogger and H5Writer is ready and register the DataLogger to the state propagator.

dl = DataLogger("wsm", sp, h5_writer)
dl.params.log_first_step = True
dl.setPeriod(0.1)

print(f"Data Logger is Finalized: {dl.isReady()}")
Data Logger is Finalized: True

Run the Simulation#

# This should log both times for 0.0, 0.1, and 0.2
sp.advanceTo(0.2)

# This should log only time_1 for 0.3
h5_writer.deactivateTable("time_2")
sp.advanceTo(0.3)

# This should log nothing for time up to 1.0
sp.unregisterModel(dl)
sp.advanceTo(1.0)

# This should log only at 1.1, not 1.0
h5_writer.activateTable("time_2")
dl.params.log_first_step = False
sp.registerModel(dl)
sp.advanceTo(1.1)
t = 0.1s; x = [ 0.38085961  0.00907525 -0.23551005  0.17987599]
t = 0.2s; x = [ 0.34610386  0.03532941 -0.45583893  0.34039766]
t = 0.3s; x = [ 0.29068861  0.07587749 -0.64636777  0.46275047]
t = 0.4s; x = [ 0.21827047  0.1260333  -0.79389321  0.5300487 ]
t = 0.5s; x = [ 0.13370607  0.17961689 -0.8878367   0.52975609]
t = 0.6s; x = [ 0.04272369  0.22952563 -0.92153824  0.45633623]
t = 0.7s; x = [-0.04851943  0.26852565 -0.89317787  0.31280251]
t = 0.8s; x = [-0.13393852  0.29010315 -0.80599992  0.11031439]
t = 0.9s; x = [-0.2080092   0.28918536 -0.66784444 -0.13388806]
t = 1.0s; x = [-0.26618358  0.26261765 -0.49022784 -0.39909194]
t = 1.1s; x = [-0.30520321  0.20939939 -0.28723186 -0.66320243]
<SpStatusEnum.REACHED_END_TIME: 1>

Read Logging Output#

Let’s now view the results from the simulation. Make sure to delete the H5 writer and PacketTableConfigs to unlock the file.

del c1, c2, c3, c4, h5_writer, integrator, dl
discard(sp)

# load the file
file = h5py.File("log.h5", "r")
file.keys()
<KeysViewHDF5 ['rel_vel', 'state', 'time_1', 'time_2']>
# Output state data
state_data = file["state"][:]
print(f"State Vector: {state_data}")

# Output time data
time_data = file["time_1"][:]
print(f"Time1 Data: {time_data}")

# Output time data 2
time_data_2 = file["time_2"][:]
print(f"Time2 Data: {time_data_2}")

# Output relative velocity data
rel_vel_data = file["rel_vel"][:]
print(f"Relative Velocity Data: {rel_vel_data}")
State Vector: [([ 0.39269908,  0.        ,  0.        ,  0.        ],)
 ([ 0.38085961,  0.00907525, -0.23551005,  0.17987599],)
 ([ 0.34610386,  0.03532941, -0.45583893,  0.34039766],)
 ([ 0.29068861,  0.07587749, -0.64636777,  0.46275047],)
 ([-0.30520321,  0.20939939, -0.28723186, -0.66320243],)]
Time1 Data: [(0. ,) (0.1,) (0.2,) (0.3,) (1.1,)]
Time2 Data: [(0. ,) (0.1,) (0.2,) (1.1,)]
Relative Velocity Data: [([ 0.        ,  0.        ,  0.        ],)
 ([-0.08993429,  0.        ,  0.0008162 ],)
 ([-0.17009262,  0.        ,  0.00601177],)
 ([-0.23070949,  0.        ,  0.01753933],)
 ([ 0.3243577 ,  0.        , -0.06893075],)]

Clean Up the Simulation#

def cleanup():
    """Cleanup the simulation."""
    global bd1, bd2, web_scene, proxy_scene
    del bd1, bd2, web_scene, proxy_scene

    cleanup_graphics()

    discard(mb)
    discard(fc)


atexit.register(cleanup)
<function __main__.cleanup()>

Summary#

You are now able to store both non-simulation and simulation data to a H5 file and read its contents back later! We covered setting up PacketTableConfigs and attaching logging functions, creating our H5Writer, and registering a DataLogger model.