Karana.KUtils
=============

.. py:module:: Karana.KUtils

.. autoapi-nested-parse::

   Simulation utilities such as data logging and data plotting.



Submodules
----------

.. toctree::
   :maxdepth: 1

   /generated/python_api/Karana/KUtils/BasicPrefab/index
   /generated/python_api/Karana/KUtils/DataPlotter/index
   /generated/python_api/Karana/KUtils/DataStruct/index
   /generated/python_api/Karana/KUtils/FilesystemLikeDatabase/index
   /generated/python_api/Karana/KUtils/Kclick/index
   /generated/python_api/Karana/KUtils/MultibodyTUI/index
   /generated/python_api/Karana/KUtils/MultibodyWebUI/index
   /generated/python_api/Karana/KUtils/Prefab/index
   /generated/python_api/Karana/KUtils/PrefabInterfaces/index
   /generated/python_api/Karana/KUtils/Sim/index
   /generated/python_api/Karana/KUtils/darts_model_file_converter/index
   /generated/python_api/Karana/KUtils/multirun/index
   /generated/python_api/Karana/KUtils/ros/index
   /generated/python_api/Karana/KUtils/visjs/index
   /generated/python_api/Karana/KUtils/vizutils/index


Classes
-------

.. autoapisummary::

   Karana.KUtils.FSM
   Karana.KUtils.FSMState
   Karana.KUtils.FSMTransition
   Karana.KUtils.FSMVars
   Karana.KUtils.H5Writer
   Karana.KUtils.PacketTableConfig
   Karana.KUtils.PlotData
   Karana.KUtils.SinglePlotData
   Karana.KUtils.PythonReentryMonitor


Package Contents
----------------

.. py:class:: FSM(name: str)

   Bases: :py:obj:`Karana.Core.Base`


   A class to implement finite state machines.

   The finite state machine is implemented as a graph that connects
   FSMStates with FSMTransitions.


   .. py:method:: create(name: str) -> FSM
      :staticmethod:


      Create a new instance of FSM.

      :param name: The name of the FSM.

      :returns: A pointer to the newly created instance of FSM.



   .. py:method:: getCurrentState() -> FSMState

      Get the current state.

      :returns: The current FSMState.



   .. py:method:: getState(name: str) -> FSMState

      Get the state with the provided name.

      :param name: The name of the state to retrieve.

      :returns: The FSMState with the provided name.



   .. py:method:: getStates() -> list[FSMState]

      Get a list of all the FSMStates in this FSM.

      :returns: A list of all the FSMStates in this FSM.



   .. py:method:: getTraceFSM() -> bool

      Get whether this FSM is printing trace messages or not.

      :returns: true if trace messages are being printed, false otherwise.



   .. py:method:: getTransition(parent_state: str, child_state: str) -> FSMTransition

      Get the transition that connects the two states with the provided
      names.

      :param parent_state: The name of the parent of the edge.
      :param child_state: The name of the child of the edge.

      :returns: The transition that connects the parent and child states with the
                names provided.



   .. py:method:: getVars() -> FSMVars


   .. py:method:: setCurrentState(state_name: str) -> None
                  setCurrentState(state: FSMState) -> None

      Set the current state of this FSM to the state with the provided name.

      :param name: The name of the state to set the FSM to.



   .. py:method:: setTraceFSM(arg0: bool) -> None

      Set whether this FSM prints trace messages or not.

      :param trace_fsm: true to set the FSM to print trace messages, false to
                        stop printing trace messages.



   .. py:method:: update() -> None

      Update the FSM.

      This updates the current state using the should_transition_fns of the
      FSMTransitions for the current state.



   .. py:property:: on_state_transition_fns
      :type: _FSMTransitionCallbackRegistry


      These functions will run whenever we transition to a new state


   .. py:method:: createFSMGraphVis(port: int = 0, network_options: Karana.KUtils.visjs.NetworkOptions = NetworkOptions()) -> Karana.KUtils.visjs.FSMGraphServer

      Create a visjs graph for this FSM.

      This also registers a method to update the graph as the FSM updates.

      :param port: The port to run the server on.
      :type port: int
      :param network_options: The network options for the graph.
      :type network_options: NetworkOptions

      :returns: The visjs GraphServer for this FSM.
      :rtype: FSMGraphServer



.. py:class:: FSMState(name: str, fsm: FSM)

   Bases: :py:obj:`Karana.Core.Base`


   Represents a state in an FSM.

   This is a node in the graph that makes up an FSM.


   .. py:method:: create(name: str, fsm: FSM) -> FSMState
      :staticmethod:


      Create a new instance of FSMState.

      :param name: The name of the FSMState. The names for states in an FSM
                   must be unique.
      :param fsm: The FSM that this FSMState will be a part of.

      :returns: A pointer to the newly created instance of FSMState.



   .. py:method:: getChildTransitions() -> list[FSMTransition]

      Get all the transitions that connect this state to its children.

      :returns: All the transitions that connect this state to its children.



   .. py:method:: getParentTransitions() -> list[FSMTransition]

      Get all the transitions that connect this state to its parents.

      :returns: All the transitions that connect this state to its parents.



   .. py:property:: during_fns
      :type: Karana.Core.VoidCallbackRegistry


      Functions that execute during the state


   .. py:property:: on_enter_fns
      :type: Karana.Core.VoidCallbackRegistry


      Functions that execute when entering the state


   .. py:property:: on_exit_fns
      :type: Karana.Core.VoidCallbackRegistry


      Functions that execute when exiting the state


.. py:class:: FSMTransition(parent_state: FSMState, child_state: FSMState, transition_fn: collections.abc.Callable[[], bool])

   Bases: :py:obj:`Karana.Core.Base`


   An edge in the graph that makes up an FSM.


   .. py:method:: create(parent_state: FSMState, child_state: FSMState, transition_fn: collections.abc.Callable[[], bool]) -> FSMTransition
      :staticmethod:


      Create a new instance of FSMTransition.

      :param parent_state: The parent FSMState of the edge.
      :param child_state: The child FSMState of the edge.
      :param should_transition_fn: The function that indicates whether this
                                   state should transition or not. This should
                                   return true when it is time to transition,
                                   and false when it is not. The FSM.update
                                   method checks these functions to decide when
                                   to transition.

      :returns: A pointer to the newly created instance of FSMTransition.



   .. py:method:: getChildState() -> FSMState

      Get the child state for this edge.

      :returns: The child state for this edge.



   .. py:method:: getParentState() -> FSMState

      Get the parent state for this edge.

      :returns: The parent state for this edge.



   .. py:property:: on_transition_fns
      :type: Karana.Core.VoidCallbackRegistry


      Functions that execute when transitioning on this edge


   .. py:property:: should_transition_fn
      :type: collections.abc.Callable[[], bool]


      Function that indicates if we should transition. true for yes and
      false for no.


.. py:class:: FSMVars

   Bases: :py:obj:`Karana.Core.BaseVars`


   Vars for the FSM class.


   .. py:property:: current_state
      :type: Karana.Core.VarString


      The current state of the FSM


.. py:class:: H5Writer(filename: str)

   Bases: :py:obj:`Karana.Core.Base`


   Logs data to an HDF5 file. The data to be logged is configured
          via `PacketTableConfig`s.


   .. py:method:: create(filename: str) -> H5Writer
      :staticmethod:


      Create an H5Writer class.

      :param filename: The name of the h5 file to create.

      :returns: A ks_ptr to the newly created instance of H5Writer.



   .. py:method:: activateTable(name: str) -> None

      Activate the table with the given name.

      :param name: The name of the table to activate.



   .. py:method:: createTable(config: PacketTableConfig) -> None

      Create a packet table to log.

      This creates the table and adds it to the active tables. Activate
      tables will have a data entry added whenever log is called. To
      deactivate a table use the deactivateTable method. The name of the
      tame will be the same as the name of the PacketTableConfig.

      :param config: The config that specifies the data of the packet table.



   .. py:method:: deactivateTable(name: str) -> None

      Deactivate the table with the given name.

      :param name: The name of the table to deactivate.



   .. py:method:: getActiveTableNames() -> list[str]

      Get a vector of the active table names.

      :returns: A vector of the active table names.



   .. py:method:: getTableNames() -> list[str]

      Get a vector of all the table names.

      :returns: A vector of the table names.



   .. py:method:: log() -> None

      Log data for all the active tables.



   .. py:method:: logTable(name: str) -> None

      Create a log entry for just the given table.

      This will log the table regardless of whether it is active or not.

      :param name: The name of the table to log.



.. py:class:: PacketTableConfig(name: str)

   Bases: :py:obj:`Karana.Core.LockingBase`


   Define the columns and associated update functions for a PacketTable.
          The PacketTable is the table that stores data in the H5 log
          file.


   .. py:method:: create(name: str) -> PacketTableConfig
      :staticmethod:


      Create a packet table config.

      :param name: Name of the table. A / will be interpreted as a new group.
                   Groups will be added as needed.

      :returns: A ks_ptr to the newly created instance of PacketTableConfig.



   .. py:method:: addData(var: Karana.Core.VarVec, as_scalars: bool = False) -> None
                  addData(var: Karana.Core.VarMat, as_scalars: bool = False) -> None
                  addData(var: Karana.Core.VarVec3, as_scalars: bool = False) -> None
                  addData(var: Karana.Core.VarVec6, as_scalars: bool = False) -> None
                  addData(var: Karana.Core.VarMat33, as_scalars: bool = False) -> None
                  addData(var: Karana.Core.VarMat66, as_scalars: bool = False) -> None
                  addData(name: str, f: collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]]], vector_size: SupportsInt | SupportsIndex, as_scalars: bool = False) -> None
                  addData(name: str, f: collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, n]]], matrix_rows: SupportsInt | SupportsIndex, matrix_cols: SupportsInt | SupportsIndex, as_scalars: bool = False) -> None
                  addData(var: Karana.Core.VarDouble) -> None
                  addData(var: Karana.Core.VarFloat) -> None
                  addData(var: Karana.Core.VarInt) -> None
                  addData(var: Karana.Core.VarShort) -> None
                  addData(var: Karana.Core.VarLong) -> None
                  addData(var: Karana.Core.VarLong) -> None

      Add data to the table via a Var_T with loggable data.

      :param var: The Var_T to add to the table.



   .. py:method:: addDataFloat(name: str, f: collections.abc.Callable[[], float]) -> None

      Add data to the table via function.
      :param name: The name of the column in the table.
      :param f: The function that produces the data.



   .. py:method:: addDataInt(name: str, f: collections.abc.Callable[[], int]) -> None

      Add data to the table via function.
      :param name: The name of the column in the table.
      :param f: The function that produces the data.



   .. py:method:: fillBuf() -> None

      Fill the buffer by calling all the functions.



.. py:class:: PlotData(plot_data: collections.abc.Sequence[SinglePlotData], host: str, port: str, target: str)

   Bases: :py:obj:`Karana.Core.BaseWithVars`


   .. py:method:: create(plot_data: collections.abc.Sequence[SinglePlotData], host: str, port: str, target: str) -> PlotData
      :staticmethod:


      Create an instance of PlotData.

      :param plot_data: The data for the plots.
      :param host: The host for the websocket.
      :param port: The port for the websocket.
      :param target: The target for the websocket.

      :returns: A pointer to the newly created PlotData.



   .. py:method:: addPlot(plot_data: SinglePlotData) -> None

      Add a plot.

      :param plot_data: The data for the plot to add.



   .. py:method:: removePlot(index: SupportsInt | SupportsIndex) -> None

      Remove the plot at index.

      :param index: The index of the plot to remove.



   .. py:method:: sendWebsocketMessage(msg: str) -> None

      Send a message over the websocket. This should be used with caution
      and only if you know exactly what type of messages are allowed to be
      sent.
      :param msg: The message to send.



   .. py:method:: shutdown() -> None

      Shutdown the websocket client.



   .. py:method:: update() -> None

      Get data from the SinglePlotData and send it via a websocket to the
      server.



   .. py:property:: plot_fns
      :type: list[SinglePlotData]


      The data for the plots.


.. py:class:: SinglePlotData(title: str, x_data: Karana.Core.VarDouble | tuple[str, collections.abc.Callable[[], float]], y_data: collections.abc.Sequence[Karana.Core.VarDouble | Karana.Core.VarVec3 | Karana.Core.VarVec | Karana.Core.VarSpatialVector | Karana.Core.VarSpatialVelocity | Karana.Core.VarSpatialAcceleration | Karana.Core.VarSpatialForce | tuple[str, collections.abc.Callable[[], float]] | tuple[str, collections.abc.Callable[[], Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]]], SupportsInt | SupportsIndex]])

   .. py:property:: title
      :type: str


      The title of the plot


   .. py:property:: x_data
      :type: collections.abc.Callable[[], float]


      The function that produces the x_data


   .. py:property:: x_data_name
      :type: str


      The name of the x_data


   .. py:property:: y_data_fns
      :type: list[collections.abc.Callable[[], float | Annotated[numpy.typing.NDArray[numpy.float64], [3, 1]] | Annotated[numpy.typing.NDArray[numpy.float64], [m, 1]] | Karana.Math.SpatialVector | Karana.Math.SpatialVelocity | Karana.Math.SpatialAcceleration | Karana.Math.SpatialForce]]


      The y_data functions


   .. py:property:: y_data_names
      :type: list[str]


      The y_data names


   .. py:property:: y_data_sizes
      :type: list[int]


      The y_data function sizes


.. py:class:: PythonReentryMonitor

   Monitor when you re-enter Python from C++.

   This class is designed to help you identify places in your simulation where you are
   re-entering the Python interpreter from C++. This can help identify places in your
   simulation where you can improve performance by migrating to C++.


   .. py:attribute:: traces
      :value: []



   .. py:method:: trace(frame, event, _)

      Trace the current line of Python code.

      This will track any call event. This does not count recursive events, so it will
      only track call events made at the top-level.

      :param frame: The Python frame.
      :param event: The event being traced.
      :param _: arg, unused.

      :rtype: The trace method



   .. py:method:: __enter__()

      Use this as a context.

      This will start monitoring all calls to Python in the context.

      :returns: This object.
      :rtype: Self



   .. py:method:: __exit__(*_)

      Exit the context.

      See __enter__ for details.

      :param \*_: Exit context parameters. Unused.

      :returns: True if this exited successfully.
      :rtype: bool



   .. py:method:: dump()

      Dump the traces captured by this monitor.



   .. py:method:: clear()

      Clear out all the traces this monitor has captured.



