Karana.KUtils

Contents

Karana.KUtils#

Simulation utilities such as data logging and data plotting.

Submodules#

Classes#

FSM

A class to implement finite state machines.

FSMState

Represents a state in an FSM.

FSMTransition

An edge in the graph that makes up an FSM.

FSMVars

Vars for the FSM class.

H5Writer

Logs data to an HDF5 file. The data to be logged is configured

PacketTableConfig

Define the columns and associated update functions for a PacketTable.

PlotData

SinglePlotData

PythonReentryMonitor

Monitor when you re-enter Python from C++.

Package Contents#

class Karana.KUtils.FSM(name: str)#

Bases: Karana.Core.Base

A class to implement finite state machines.

The finite state machine is implemented as a graph that connects FSMStates with FSMTransitions.

static create(name: str) FSM#

Create a new instance of FSM.

Parameters:

name – The name of the FSM.

Returns:

A pointer to the newly created instance of FSM.

getCurrentState() FSMState#

Get the current state.

Returns:

The current FSMState.

getState(name: str) FSMState#

Get the state with the provided name.

Parameters:

name – The name of the state to retrieve.

Returns:

The FSMState with the provided name.

getStates() list[FSMState]#

Get a list of all the FSMStates in this FSM.

Returns:

A list of all the FSMStates in this FSM.

getTraceFSM() bool#

Get whether this FSM is printing trace messages or not.

Returns:

true if trace messages are being printed, false otherwise.

getTransition(parent_state: str, child_state: str) FSMTransition#

Get the transition that connects the two states with the provided names.

Parameters:
  • parent_state – The name of the parent of the edge.

  • child_state – The name of the child of the edge.

Returns:

The transition that connects the parent and child states with the names provided.

getVars() FSMVars#
setCurrentState(state_name: str) None#
setCurrentState(state: FSMState) None

Set the current state of this FSM to the state with the provided name.

Parameters:

name – The name of the state to set the FSM to.

setTraceFSM(arg0: bool) None#

Set whether this FSM prints trace messages or not.

Parameters:

trace_fsm – true to set the FSM to print trace messages, false to stop printing trace messages.

update() None#

Update the FSM.

This updates the current state using the should_transition_fns of the FSMTransitions for the current state.

property on_state_transition_fns: _FSMTransitionCallbackRegistry#

These functions will run whenever we transition to a new state

createFSMGraphVis(port: int = 0, network_options: Karana.KUtils.visjs.NetworkOptions = NetworkOptions()) Karana.KUtils.visjs.FSMGraphServer#

Create a visjs graph for this FSM.

This also registers a method to update the graph as the FSM updates.

Parameters:
  • port (int) – The port to run the server on.

  • network_options (NetworkOptions) – The network options for the graph.

Returns:

The visjs GraphServer for this FSM.

Return type:

FSMGraphServer

class Karana.KUtils.FSMState(name: str, fsm: FSM)#

Bases: Karana.Core.Base

Represents a state in an FSM.

This is a node in the graph that makes up an FSM.

static create(name: str, fsm: FSM) FSMState#

Create a new instance of FSMState.

Parameters:
  • name – The name of the FSMState. The names for states in an FSM must be unique.

  • fsm – The FSM that this FSMState will be a part of.

Returns:

A pointer to the newly created instance of FSMState.

getChildTransitions() list[FSMTransition]#

Get all the transitions that connect this state to its children.

Returns:

All the transitions that connect this state to its children.

getParentTransitions() list[FSMTransition]#

Get all the transitions that connect this state to its parents.

Returns:

All the transitions that connect this state to its parents.

property during_fns: Karana.Core.VoidCallbackRegistry#

Functions that execute during the state

property on_enter_fns: Karana.Core.VoidCallbackRegistry#

Functions that execute when entering the state

property on_exit_fns: Karana.Core.VoidCallbackRegistry#

Functions that execute when exiting the state

class Karana.KUtils.FSMTransition(parent_state: FSMState, child_state: FSMState, transition_fn: collections.abc.Callable[[], bool])#

Bases: Karana.Core.Base

An edge in the graph that makes up an FSM.

static create(parent_state: FSMState, child_state: FSMState, transition_fn: collections.abc.Callable[[], bool]) FSMTransition#

Create a new instance of FSMTransition.

Parameters:
  • parent_state – The parent FSMState of the edge.

  • child_state – The child FSMState of the edge.

  • should_transition_fn – The function that indicates whether this state should transition or not. This should return true when it is time to transition, and false when it is not. The FSM.update method checks these functions to decide when to transition.

Returns:

A pointer to the newly created instance of FSMTransition.

getChildState() FSMState#

Get the child state for this edge.

Returns:

The child state for this edge.

getParentState() FSMState#

Get the parent state for this edge.

Returns:

The parent state for this edge.

property on_transition_fns: Karana.Core.VoidCallbackRegistry#

Functions that execute when transitioning on this edge

property should_transition_fn: collections.abc.Callable[[], bool]#

Function that indicates if we should transition. true for yes and false for no.

class Karana.KUtils.FSMVars#

Bases: Karana.Core.BaseVars

Vars for the FSM class.

property current_state: Karana.Core.VarString#

The current state of the FSM

class Karana.KUtils.H5Writer(filename: str)#

Bases: Karana.Core.Base

Logs data to an HDF5 file. The data to be logged is configured

via `PacketTableConfig`s.

static create(filename: str) H5Writer#

Create an H5Writer class.

Parameters:

filename – The name of the h5 file to create.

Returns:

A ks_ptr to the newly created instance of H5Writer.

activateTable(name: str) None#

Activate the table with the given name.

Parameters:

name – The name of the table to activate.

createTable(config: PacketTableConfig) None#

Create a packet table to log.

This creates the table and adds it to the active tables. Activate tables will have a data entry added whenever log is called. To deactivate a table use the deactivateTable method. The name of the tame will be the same as the name of the PacketTableConfig.

Parameters:

config – The config that specifies the data of the packet table.

deactivateTable(name: str) None#

Deactivate the table with the given name.

Parameters:

name – The name of the table to deactivate.

getActiveTableNames() list[str]#

Get a vector of the active table names.

Returns:

A vector of the active table names.

getTableNames() list[str]#

Get a vector of all the table names.

Returns:

A vector of the table names.

log() None#

Log data for all the active tables.

logTable(name: str) None#

Create a log entry for just the given table.

This will log the table regardless of whether it is active or not.

Parameters:

name – The name of the table to log.

class Karana.KUtils.PacketTableConfig(name: str)#

Bases: Karana.Core.LockingBase

Define the columns and associated update functions for a PacketTable.

The PacketTable is the table that stores data in the H5 log file.

static create(name: str) PacketTableConfig#

Create a packet table config.

Parameters:

name – Name of the table. A / will be interpreted as a new group. Groups will be added as needed.

Returns:

A ks_ptr to the newly created instance of PacketTableConfig.

addData(var: Karana.Core.VarVec, as_scalars: bool = False) None#
addData(var: Karana.Core.VarMat, as_scalars: bool = False) None
addData(var: Karana.Core.VarVec3, as_scalars: bool = False) None
addData(var: Karana.Core.VarVec6, as_scalars: bool = False) None
addData(var: Karana.Core.VarMat33, as_scalars: bool = False) None
addData(var: Karana.Core.VarMat66, as_scalars: bool = False) None
addData(name: str, f: collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]]], vector_size: SupportsInt | SupportsIndex, as_scalars: bool = False) None
addData(name: str, f: collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, n]]], matrix_rows: SupportsInt | SupportsIndex, matrix_cols: SupportsInt | SupportsIndex, as_scalars: bool = False) None
addData(var: Karana.Core.VarDouble) None
addData(var: Karana.Core.VarFloat) None
addData(var: Karana.Core.VarInt) None
addData(var: Karana.Core.VarShort) None
addData(var: Karana.Core.VarLong) None
addData(var: Karana.Core.VarLong) None

Add data to the table via a Var_T with loggable data.

Parameters:

var – The Var_T to add to the table.

addDataFloat(name: str, f: collections.abc.Callable[[], float]) None#

Add data to the table via function. :param name: The name of the column in the table. :param f: The function that produces the data.

addDataInt(name: str, f: collections.abc.Callable[[], int]) None#

Add data to the table via function. :param name: The name of the column in the table. :param f: The function that produces the data.

fillBuf() None#

Fill the buffer by calling all the functions.

class Karana.KUtils.PlotData(plot_data: collections.abc.Sequence[SinglePlotData], host: str, port: str, target: str)#

Bases: Karana.Core.BaseWithVars

static create(plot_data: collections.abc.Sequence[SinglePlotData], host: str, port: str, target: str) PlotData#

Create an instance of PlotData.

Parameters:
  • plot_data – The data for the plots.

  • host – The host for the websocket.

  • port – The port for the websocket.

  • target – The target for the websocket.

Returns:

A pointer to the newly created PlotData.

addPlot(plot_data: SinglePlotData) None#

Add a plot.

Parameters:

plot_data – The data for the plot to add.

removePlot(index: SupportsInt | SupportsIndex) None#

Remove the plot at index.

Parameters:

index – The index of the plot to remove.

sendWebsocketMessage(msg: str) None#

Send a message over the websocket. This should be used with caution and only if you know exactly what type of messages are allowed to be sent. :param msg: The message to send.

shutdown() None#

Shutdown the websocket client.

update() None#

Get data from the SinglePlotData and send it via a websocket to the server.

property plot_fns: list[SinglePlotData]#

The data for the plots.

class Karana.KUtils.SinglePlotData(title: str, x_data: Karana.Core.VarDouble | tuple[str, collections.abc.Callable[[], float]], y_data: collections.abc.Sequence[Karana.Core.VarDouble | Karana.Core.VarVec3 | Karana.Core.VarVec | Karana.Core.VarSpatialVector | Karana.Core.VarSpatialVelocity | Karana.Core.VarSpatialAcceleration | Karana.Core.VarSpatialForce | tuple[str, collections.abc.Callable[[], float]] | tuple[str, collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]]], SupportsInt | SupportsIndex]])#
property title: str#

The title of the plot

property x_data: collections.abc.Callable[[], float]#

The function that produces the x_data

property x_data_name: str#

The name of the x_data

property y_data_fns: list[collections.abc.Callable[[], float | Annotated[numpy.typing.NDArray[numpy.float64], [3, 1]] | Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]] | Karana.Math.SpatialVector | Karana.Math.SpatialVelocity | Karana.Math.SpatialAcceleration | Karana.Math.SpatialForce]]#

The y_data functions

property y_data_names: list[str]#

The y_data names

property y_data_sizes: list[int]#

The y_data function sizes

class Karana.KUtils.PythonReentryMonitor[source]#

Monitor when you re-enter Python from C++.

This class is designed to help you identify places in your simulation where you are re-entering the Python interpreter from C++. This can help identify places in your simulation where you can improve performance by migrating to C++.

traces = []#
trace(frame, event, _)[source]#

Trace the current line of Python code.

This will track any call event. This does not count recursive events, so it will only track call events made at the top-level.

Parameters:
  • frame – The Python frame.

  • event – The event being traced.

  • _ – arg, unused.

Return type:

The trace method

__enter__()[source]#

Use this as a context.

This will start monitoring all calls to Python in the context.

Returns:

This object.

Return type:

Self

__exit__(*_)[source]#

Exit the context.

See __enter__ for details.

Parameters:

*_ – Exit context parameters. Unused.

Returns:

True if this exited successfully.

Return type:

bool

dump()[source]#

Dump the traces captured by this monitor.

clear()[source]#

Clear out all the traces this monitor has captured.