Karana.KUtils#
Simulation utilities such as data logging and data plotting.
Submodules#
- Karana.KUtils.BasicPrefab
- Karana.KUtils.DataPlotter
- Karana.KUtils.DataStruct
- Karana.KUtils.FilesystemLikeDatabase
- Karana.KUtils.Kclick
- Karana.KUtils.MultibodyTUI
- Karana.KUtils.MultibodyWebUI
- Karana.KUtils.Prefab
- Karana.KUtils.PrefabInterfaces
- Karana.KUtils.Sim
- Karana.KUtils.darts_model_file_converter
- Karana.KUtils.multirun
- Karana.KUtils.ros
- Karana.KUtils.visjs
- Karana.KUtils.vizutils
Classes#
A class to implement finite state machines. |
|
Represents a state in an FSM. |
|
An edge in the graph that makes up an FSM. |
|
Vars for the FSM class. |
|
Logs data to an HDF5 file. The data to be logged is configured |
|
Define the columns and associated update functions for a PacketTable. |
|
Monitor when you re-enter Python from C++. |
Package Contents#
- class Karana.KUtils.FSM(name: str)#
Bases:
Karana.Core.BaseA class to implement finite state machines.
The finite state machine is implemented as a graph that connects FSMStates with FSMTransitions.
- static create(name: str) FSM#
Create a new instance of FSM.
- Parameters:
name – The name of the FSM.
- Returns:
A pointer to the newly created instance of FSM.
- getState(name: str) FSMState#
Get the state with the provided name.
- Parameters:
name – The name of the state to retrieve.
- Returns:
The FSMState with the provided name.
- getStates() list[FSMState]#
Get a list of all the FSMStates in this FSM.
- Returns:
A list of all the FSMStates in this FSM.
- getTraceFSM() bool#
Get whether this FSM is printing trace messages or not.
- Returns:
true if trace messages are being printed, false otherwise.
- getTransition(parent_state: str, child_state: str) FSMTransition#
Get the transition that connects the two states with the provided names.
- Parameters:
parent_state – The name of the parent of the edge.
child_state – The name of the child of the edge.
- Returns:
The transition that connects the parent and child states with the names provided.
- setCurrentState(state_name: str) None#
- setCurrentState(state: FSMState) None
Set the current state of this FSM to the state with the provided name.
- Parameters:
name – The name of the state to set the FSM to.
- setTraceFSM(arg0: bool) None#
Set whether this FSM prints trace messages or not.
- Parameters:
trace_fsm – true to set the FSM to print trace messages, false to stop printing trace messages.
- update() None#
Update the FSM.
This updates the current state using the should_transition_fns of the FSMTransitions for the current state.
- property on_state_transition_fns: _FSMTransitionCallbackRegistry#
These functions will run whenever we transition to a new state
- createFSMGraphVis(port: int = 0, network_options: Karana.KUtils.visjs.NetworkOptions = NetworkOptions()) Karana.KUtils.visjs.FSMGraphServer#
Create a visjs graph for this FSM.
This also registers a method to update the graph as the FSM updates.
- Parameters:
port (int) – The port to run the server on.
network_options (NetworkOptions) – The network options for the graph.
- Returns:
The visjs GraphServer for this FSM.
- Return type:
- class Karana.KUtils.FSMState(name: str, fsm: FSM)#
Bases:
Karana.Core.BaseRepresents a state in an FSM.
This is a node in the graph that makes up an FSM.
- static create(name: str, fsm: FSM) FSMState#
Create a new instance of FSMState.
- Parameters:
name – The name of the FSMState. The names for states in an FSM must be unique.
fsm – The FSM that this FSMState will be a part of.
- Returns:
A pointer to the newly created instance of FSMState.
- getChildTransitions() list[FSMTransition]#
Get all the transitions that connect this state to its children.
- Returns:
All the transitions that connect this state to its children.
- getParentTransitions() list[FSMTransition]#
Get all the transitions that connect this state to its parents.
- Returns:
All the transitions that connect this state to its parents.
- property during_fns: Karana.Core.VoidCallbackRegistry#
Functions that execute during the state
- property on_enter_fns: Karana.Core.VoidCallbackRegistry#
Functions that execute when entering the state
- property on_exit_fns: Karana.Core.VoidCallbackRegistry#
Functions that execute when exiting the state
- class Karana.KUtils.FSMTransition(parent_state: FSMState, child_state: FSMState, transition_fn: collections.abc.Callable[[], bool])#
Bases:
Karana.Core.BaseAn edge in the graph that makes up an FSM.
- static create(parent_state: FSMState, child_state: FSMState, transition_fn: collections.abc.Callable[[], bool]) FSMTransition#
Create a new instance of FSMTransition.
- Parameters:
parent_state – The parent FSMState of the edge.
child_state – The child FSMState of the edge.
should_transition_fn – The function that indicates whether this state should transition or not. This should return true when it is time to transition, and false when it is not. The FSM.update method checks these functions to decide when to transition.
- Returns:
A pointer to the newly created instance of FSMTransition.
- getChildState() FSMState#
Get the child state for this edge.
- Returns:
The child state for this edge.
- getParentState() FSMState#
Get the parent state for this edge.
- Returns:
The parent state for this edge.
- property on_transition_fns: Karana.Core.VoidCallbackRegistry#
Functions that execute when transitioning on this edge
- property should_transition_fn: collections.abc.Callable[[], bool]#
Function that indicates if we should transition. true for yes and false for no.
- class Karana.KUtils.FSMVars#
Bases:
Karana.Core.BaseVarsVars for the FSM class.
- property current_state: Karana.Core.VarString#
The current state of the FSM
- class Karana.KUtils.H5Writer(filename: str)#
Bases:
Karana.Core.Base- Logs data to an HDF5 file. The data to be logged is configured
via `PacketTableConfig`s.
- static create(filename: str) H5Writer#
Create an H5Writer class.
- Parameters:
filename – The name of the h5 file to create.
- Returns:
A ks_ptr to the newly created instance of H5Writer.
- activateTable(name: str) None#
Activate the table with the given name.
- Parameters:
name – The name of the table to activate.
- createTable(config: PacketTableConfig) None#
Create a packet table to log.
This creates the table and adds it to the active tables. Activate tables will have a data entry added whenever log is called. To deactivate a table use the deactivateTable method. The name of the tame will be the same as the name of the PacketTableConfig.
- Parameters:
config – The config that specifies the data of the packet table.
- deactivateTable(name: str) None#
Deactivate the table with the given name.
- Parameters:
name – The name of the table to deactivate.
- getActiveTableNames() list[str]#
Get a vector of the active table names.
- Returns:
A vector of the active table names.
- getTableNames() list[str]#
Get a vector of all the table names.
- Returns:
A vector of the table names.
- log() None#
Log data for all the active tables.
- logTable(name: str) None#
Create a log entry for just the given table.
This will log the table regardless of whether it is active or not.
- Parameters:
name – The name of the table to log.
- class Karana.KUtils.PacketTableConfig(name: str)#
Bases:
Karana.Core.LockingBase- Define the columns and associated update functions for a PacketTable.
The PacketTable is the table that stores data in the H5 log file.
- static create(name: str) PacketTableConfig#
Create a packet table config.
- Parameters:
name – Name of the table. A / will be interpreted as a new group. Groups will be added as needed.
- Returns:
A ks_ptr to the newly created instance of PacketTableConfig.
- addData(var: Karana.Core.VarVec, as_scalars: bool = False) None#
- addData(var: Karana.Core.VarMat, as_scalars: bool = False) None
- addData(var: Karana.Core.VarVec3, as_scalars: bool = False) None
- addData(var: Karana.Core.VarVec6, as_scalars: bool = False) None
- addData(var: Karana.Core.VarMat33, as_scalars: bool = False) None
- addData(var: Karana.Core.VarMat66, as_scalars: bool = False) None
- addData(name: str, f: collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]]], vector_size: SupportsInt | SupportsIndex, as_scalars: bool = False) None
- addData(name: str, f: collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, n]]], matrix_rows: SupportsInt | SupportsIndex, matrix_cols: SupportsInt | SupportsIndex, as_scalars: bool = False) None
- addData(var: Karana.Core.VarDouble) None
- addData(var: Karana.Core.VarFloat) None
- addData(var: Karana.Core.VarInt) None
- addData(var: Karana.Core.VarShort) None
- addData(var: Karana.Core.VarLong) None
- addData(var: Karana.Core.VarLong) None
Add data to the table via a Var_T with loggable data.
- Parameters:
var – The Var_T to add to the table.
- addDataFloat(name: str, f: collections.abc.Callable[[], float]) None#
Add data to the table via function. :param name: The name of the column in the table. :param f: The function that produces the data.
- addDataInt(name: str, f: collections.abc.Callable[[], int]) None#
Add data to the table via function. :param name: The name of the column in the table. :param f: The function that produces the data.
- fillBuf() None#
Fill the buffer by calling all the functions.
- class Karana.KUtils.PlotData(plot_data: collections.abc.Sequence[SinglePlotData], host: str, port: str, target: str)#
Bases:
Karana.Core.BaseWithVars- static create(plot_data: collections.abc.Sequence[SinglePlotData], host: str, port: str, target: str) PlotData#
Create an instance of PlotData.
- Parameters:
plot_data – The data for the plots.
host – The host for the websocket.
port – The port for the websocket.
target – The target for the websocket.
- Returns:
A pointer to the newly created PlotData.
- addPlot(plot_data: SinglePlotData) None#
Add a plot.
- Parameters:
plot_data – The data for the plot to add.
- removePlot(index: SupportsInt | SupportsIndex) None#
Remove the plot at index.
- Parameters:
index – The index of the plot to remove.
- sendWebsocketMessage(msg: str) None#
Send a message over the websocket. This should be used with caution and only if you know exactly what type of messages are allowed to be sent. :param msg: The message to send.
- shutdown() None#
Shutdown the websocket client.
- update() None#
Get data from the SinglePlotData and send it via a websocket to the server.
- property plot_fns: list[SinglePlotData]#
The data for the plots.
- class Karana.KUtils.SinglePlotData(title: str, x_data: Karana.Core.VarDouble | tuple[str, collections.abc.Callable[[], float]], y_data: collections.abc.Sequence[Karana.Core.VarDouble | Karana.Core.VarVec3 | Karana.Core.VarVec | Karana.Core.VarSpatialVector | Karana.Core.VarSpatialVelocity | Karana.Core.VarSpatialAcceleration | Karana.Core.VarSpatialForce | tuple[str, collections.abc.Callable[[], float]] | tuple[str, collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]]], SupportsInt | SupportsIndex]])#
- property title: str#
The title of the plot
- property x_data: collections.abc.Callable[[], float]#
The function that produces the x_data
- property x_data_name: str#
The name of the x_data
- property y_data_fns: list[collections.abc.Callable[[], float | Annotated[numpy.typing.NDArray[numpy.float64], [3, 1]] | Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]] | Karana.Math.SpatialVector | Karana.Math.SpatialVelocity | Karana.Math.SpatialAcceleration | Karana.Math.SpatialForce]]#
The y_data functions
- property y_data_names: list[str]#
The y_data names
- property y_data_sizes: list[int]#
The y_data function sizes
- class Karana.KUtils.PythonReentryMonitor[source]#
Monitor when you re-enter Python from C++.
This class is designed to help you identify places in your simulation where you are re-entering the Python interpreter from C++. This can help identify places in your simulation where you can improve performance by migrating to C++.
- traces = []#
- trace(frame, event, _)[source]#
Trace the current line of Python code.
This will track any call event. This does not count recursive events, so it will only track call events made at the top-level.
- Parameters:
frame – The Python frame.
event – The event being traced.
_ – arg, unused.
- Return type:
The trace method
- __enter__()[source]#
Use this as a context.
This will start monitoring all calls to Python in the context.
- Returns:
This object.
- Return type:
Self