Source code for Karana.KUtils.Sim

# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.

"""Sim class and associated classes/functions."""

from typing import TYPE_CHECKING, Callable, Literal, Optional, cast, overload
from Karana.Dynamics.SOADyn_types import (
    MultibodyWebUI,
    StatePropagatorDS,
    SubGraphDS,
    GraphicsSetupDS,
    GuiSetupDS,
)
from Karana.Dynamics import Multibody, StatePropagator, StickPartsConfig
from Karana.Frame import FrameContainer, Frame
from Karana.Core import BaseContainer, BaseWithVars, CppWeakRef, discard, Base, warn
from Karana.Integrators import IntegratorType
from Karana.Models import GilRelease, UpdateProxyScene
from Karana.Scene import CollisionScene, ProxyScene, WebScene
from Karana.Scene.ProxyScene_types import ProxySceneDS
from Karana.Scene.Scene_types import CollisionSceneDS, CollisionSceneType
from pydantic import Field, SerializeAsAny, model_validator

if TYPE_CHECKING:
    from Karana.KUtils.DataStruct import DataStruct
    from Karana.KUtils.Prefab import (
        Prefab,
        ConfigType,
        ContextType,
        ParamsType,
        PrefabDS,
        Config,
        Context,
        Params,
    )
    from Karana.KUtils.BasicPrefab import BasicPrefabDS, BasicPrefab, ExtraInfoType
else:
    from .DataStruct import DataStruct
    from .Prefab import (
        Prefab,
        ConfigType,
        ContextType,
        ParamsType,
        PrefabDS,
        Config,
        Context,
        Params,
    )
    from .BasicPrefab import BasicPrefabDS, BasicPrefab, ExtraInfoType

bc = BaseContainer.singleton()


@overload
def _discard(callback: Callable[[], None]) -> Callable[[], None]:
    """Create a discard method that uses a callback.

    This method ignores cleanup errors if not in developer mode. Users typically don't need
    to worry about this, as the OS will tear down things for them. If they do need to worry about it,
    e.g., they are creating multiple simulations in the same interpreter, then then can use
    allDestroyed to check this.

    Parameters
    ----------
    callback: Callable[[], None]
        The callback to wrap in a try/except if not in developer mode.

    Returns
    -------
    Callable[[],None]
        The discard function that uses try/except if appropriate.
    """
    ...


@overload
def _discard(obj: Base) -> Callable[[], None]:
    """Create a discard method that uses a CppWeakRef.

    This method also ignores cleanup errors if not in developer mode. Users typically don't need
    to worry about this, as the OS will tear down things for them. If they do need to worry about it,
    e.g., they are creating multiple simulations in the same interpreter, then then can use
    allDestroyed to check this.

    Parameters
    ----------
    obj : Base
        The object to discard.

    Returns
    -------
    Callable[[],None]
        The discard function that uses a CppWeakRef.
    """
    ...


def _discard(*args, **_):
    """Create a discard method that uses a CppWeakRef.

    This method also ignores cleanup errors if not in developer mode. Users typically don't need
    to worry about this, as the OS will tear down things for them. If they do need to worry about it,
    e.g., they are creating multiple simulations in the same interpreter, then then can use
    allDestroyed to check this.

    Parameters
    ----------
    obj : Base
        The object to discard.

    Returns
    -------
    Callable[[],None]
        The discard function that uses a CppWeakRef.
    """
    obj = args[0]
    if isinstance(obj, Base):
        # Create a CppWeakRef to the object
        obj_ref = CppWeakRef(obj)

        def inner():
            nonlocal obj_ref
            obj = obj_ref()
            if obj is not None:
                if hasattr(bc, "check_all_destroyed"):
                    # We are in a developer build. Run normal discard
                    discard(obj)
                else:
                    # We are in a production build. Run this in a try/except and ignore errors.
                    # Users can check_all_destroyed if they really want to, to ensure cleanup is done
                    try:
                        discard(obj)
                    except:
                        pass

        return inner
    else:
        # obj is already a callable
        def inner():
            nonlocal obj
            if hasattr(bc, "check_all_destroyed"):
                # We are in a developer build. Run normal discard
                obj()
            else:
                # We are in a production build. Run this in a try/except and ignore errors.
                # Users can check_all_destroyed if they really want to, to ensure cleanup is done
                try:
                    obj()
                except:
                    pass

        return inner


[docs] class PrefabRoot(Prefab[Config, Context, Params]): """Prefab to act as the virtual root of the Prefab hierarchy.""" pass
[docs] class Sim(BaseWithVars): """Simulation class. This class helps one easily setup a simulation complete with a Multibody and StatePropagator. It contains helper methods to do common tasks like setup graphics or the GUI. """ @overload def __init__(self): """Create a new instance of Sim. An empty Multibody and StatePropagator will be created. """ ... @overload def __init__(self, mb_ds: SubGraphDS | None, sp_ds: StatePropagatorDS | None): """Create a new instance of Sim. Parameters ---------- mb_ds : SubGraphDS | None The SubGraphDS to use to create the Multibody. If None, then an empty Multibody will be created. sp_ds : StatePropagatorDS The StatePropagatorDS to use to create the StatePropagator. If None, then a default StatePropagator will be created. """ ... @overload def __init__(self, mb: Multibody, sp: StatePropagator): """Create a new instance of Sim. This version will not register cleanup functions, as it assumes whoever created the Multibody and StatePropagator has already done that. Parameters ---------- mb : Multibody The Multibody to use for this simulation. sp : StatePropagator The StatePropagator to use for this simulation. """ ... def __init__(self, *args, **_): """Create a new instance of Sim.""" super().__init__("sim") # Set _gui_graphics_ds to None to indicate no GUI nor graphics have been set up yet self._gui_graphics_ds = None if len(args) == 2: if isinstance(args[0], Multibody): self.mb: Multibody = args[0] self.sp: StatePropagator = args[1] self.fc = self.mb.frameContainer() else: self.fc = FrameContainer("root") bc.at_exit_fns["_discard_fc"] = _discard(self.fc) mb_ds: SubGraphDS | None = args[0] sp_ds: StatePropagatorDS | None = args[1] if mb_ds is not None: self.mb = mb_ds.toMultibody(self.fc) else: self.mb = Multibody("mb", self.fc) bc.at_exit_fns["_discard_mb"] = _discard(self.mb) self.mb.ensureHealthy() if sp_ds is not None: self.sp = sp_ds.toStatePropagator(self.mb) else: self.sp = StatePropagator(self.mb, IntegratorType.CVODE) bc.at_exit_fns["_discard_sp"] = _discard(self.sp) self.sp.setTime(0.0) else: self.fc = FrameContainer("root") bc.at_exit_fns["_discard_fc"] = _discard(self.fc) self.mb = Multibody("multibody", self.fc) bc.at_exit_fns["_discard_mb"] = _discard(self.mb) self.mb.ensureHealthy() self.sp = StatePropagator(self.mb, IntegratorType.CVODE) bc.at_exit_fns["_discard_sp"] = _discard(self.sp) self.sp.setTime(0.0) self.sp.setState(self.sp.assembleState()) self.prefab_root = PrefabRoot("prefab_root", Config(), Context(), Params())
[docs] @classmethod def fromDS(cls, sim_ds: "SimDS") -> "Sim": """Create a Sim instance from its associated DataStruct. Parameters ---------- sim_ds : SimDS The DataStruct to create the Sim from. Returns ------- Sim A new instance of Sim created from the provided DataStruct. """ return sim_ds.toSim()
[docs] def toDS(self) -> "SimDS": """Convert this Sim into a SimDS. Returns ------- SimDS A DataStruct representation of this simulation. """ ps = self.mb.getScene() collision_scenes = [] if ps is not None: for s in ps.clientScenes(): if isinstance(s, CollisionScene): collision_scenes.append(s.toDS()) return SimDS( mb_ds=self.mb.toDS(), sp_ds=self.sp.toDS(), setup_graphics=self._gui_graphics_ds, collision=collision_scenes, prefabs=[p.toDS() for p in self.prefab_root._children], )
[docs] @classmethod def fromBasicPrefabDS( cls, basic_prefab_ds: BasicPrefabDS[ExtraInfoType] ) -> tuple["Sim", BasicPrefab[ExtraInfoType]]: """Create a Sim instance from its associated DataStruct. Parameters ---------- basic_prefab_ds : BasicPrefabDS The BasicPrefabDS to create a Simulation from. Returns ------- tuple[Sim, BasicPrefab[ExtraInfoType]] A new instance of Sim created from the provided BasicPrefabDS, and the associated BasicPrefab. """ fc = FrameContainer("root") bc.at_exit_fns["_discard_fc"] = _discard(fc) bp, sp = BasicPrefab[ExtraInfoType].createStandalone(basic_prefab_ds, fc) mb = sp.getSubTree().multibody() bc.at_exit_fns["_discard_mb"] = _discard(mb) bc.at_exit_fns["_discard_sp"] = _discard(sp) return Sim(mb, sp), bp
[docs] def setupGraphics( self, *, port: int = 29523, axes: float = 1.0, client_type: Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None = "auto", origin_frame: Frame | None = None, wait_for_clients: int = 0, wait_for_clients_timeout: float = 0.0, stick_parts: Literal["auto", "always", "never"] = "auto", ) -> tuple[Callable[[], None], WebScene]: """Set up the graphics for this simulation. Parameters ---------- port : int = 29523 Port to bind the WebUI server to. Use 0 to request an arbitrary unused port. axes : float = 1.0 Length of axes visualization on root frame. client_type : Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None Policy for launching a client. "auto": pick the best option for the environment and OS "electron": always launch the electron client "notebook": always open an IFrame in IPython "webbrowser": always open a browser tab "selenium": always open a chrome driver using selenium None: don't automatically open a client Defaults to "auto". origin_frame: Optional[Frame] = None Frame to use as the world origin for the graphics scene. If None, use the Multibody's virtualRoot. wait_for_clients: int = 0 Number of client connections to wait for before continuing. wait_for_clients_timeout : float = 0.0 Number of seconds to wait before raising an error if wait_for_clients is positive. stick_parts : Literal["auto", "always", "never"] = "auto" Policy for creating stick parts. Defaults to "auto". "auto": create stick parts if the ProxyScene is empty "always": unconditionally create stick parts "never": unconditionally don't create stick parts Returns ------- tuple[Callable, WebScene] A tuple containing the a cleanup callable and the graphics scene. """ return self._setupGuiGraphics( GraphicsSetupDS( port=port, axes=axes, client_type=client_type, origin_frame=origin_frame, wait_for_clients=wait_for_clients, wait_for_clients_timeout=wait_for_clients_timeout, stick_parts=stick_parts, ) )
[docs] def setupGui( self, port=29534, client_type: Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None = "auto", wait_for_clients: int = 0, wait_for_clients_timeout: float = 0.0, stick_parts: Literal["auto", "always", "never"] = "auto", stick_parts_config: StickPartsConfig | None = None, name_to_label_map: dict[str, str] | None = None, time_display_period: float | None = 0.01, graphics_origin_frame: Frame | None = None, ) -> MultibodyWebUI: """Create the GUI for this multibody system. Parameters ---------- self : Sim The Sim instance port : int Port to bind the WebUI server to. Use 0 to request an arbitrary unused port. Defaults to 29534. client_type : Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None Policy for launching a client. "auto": pick the best option for the environment and OS "electron": always launch the electron client "notebook": always open an IFrame in IPython "webbrowser": always open a browser tab "selenium": always open a chrome driver using selenium None: don't automatically open a client Defaults to "auto". wait_for_clients: int Number of client connections to wait for before continuing. Defaults to 0. wait_for_clients_timeout : float Number of seconds to wait before raising an error if wait_for_clients is positive. Defaults to 0. graphics_origin_frame : Optional[Frame] If given, and the GUI needs to setup graphics, use this as the origin frame instead of the multibody virtual root. stick_parts : Literal["auto", "always", "never"] = "auto" Policy for creating stick parts. Defaults to "auto". "auto": create stick parts if the ProxyScene is empty "always": unconditionally create stick parts "never": unconditionally don't create stick parts stick_parts_config : StickPartsConfig | None Configuration parameters for the stick parts. name_to_label_map: dict[str, str] | None Dictionary defining the labels to use for each body in the visjs graphs display time_display_period : float | None If not None and sp is given, add a time display refreshing with this period in simulation time. Defaults to 0.01. """ gui = self._setupGuiGraphics( GuiSetupDS( port=port, client_type=client_type, wait_for_clients=wait_for_clients, wait_for_clients_timeout=wait_for_clients_timeout, stick_parts=stick_parts, stick_parts_config=stick_parts_config, name_to_label_map=name_to_label_map, sp=self.sp, sim=self, # pyright: ignore time_display_period=time_display_period, graphics_origin_frame=graphics_origin_frame, ) ) return gui
@overload def _setupGuiGraphics(self, setup: GraphicsSetupDS) -> tuple[Callable[[], None], WebScene]: """Set up graphics using the provided DataStruct. Parameters ---------- setup : GraphicsSetupDS The options for setting up the graphics. Returns ------- tuple[Callable[[], None], WebScene] The cleanup callback and WebScene. """ ... @overload def _setupGuiGraphics(self, setup: GuiSetupDS) -> MultibodyWebUI: """Set up the GUI using the provided DataStruct. Parameters ---------- setup : GuiSetupDS The options for setting up the GUI. Returns ------- MultibodyWebUI The GUI. """ ... def _setupGuiGraphics(self, *args, **_): """Set up the GUI or graphics. See overloads for details. """ gui_graphics = cast(GuiSetupDS | GraphicsSetupDS, args[0]) if self._gui_graphics_ds is not None: if hasattr(self, "gui"): warn("The GUI is already set up. Setting up again is rarely intended.") elif isinstance(gui_graphics, GraphicsSetupDS): warn("Graphics was already set up. Setting it up again is rarely intended.") gui_only = False if isinstance(self._gui_graphics_ds, GraphicsSetupDS) and not hasattr(self, "gui"): gui_only = True if self._gui_graphics_ds is not None: gui_graphics.proxy_scene_id = self._gui_graphics_ds.proxy_scene_id self._gui_graphics_ds = gui_graphics if isinstance(self._gui_graphics_ds, GuiSetupDS): if gui_only: # If running GUI only, someone already set up the graphics. Just setup the gui and # register its callback. gui = self._gui_graphics_ds.setupGui(self.mb, self) # pyright: ignore self.gui = gui GilRelease("gil_release", self.sp) bc.at_exit_fns["_cleanup_gui"] = _discard(gui.close) return gui else: # Set up the GUI and graphics. Separate the two for cleanup purposes. gui, cleanup_graphics, _ = self._gui_graphics_ds.setupGui( # pyright: ignore self.mb, self, # pyright: ignore setup_graphics_first=True, ) self.gui = gui # Register these cleanup functions if we own the StatePropagator if "_discard_sp" in bc.at_exit_fns: bc.at_exit_fns.insertBefore( "_discard_sp", "_cleanup_graphics", _discard(cleanup_graphics) ) bc.at_exit_fns["_cleanup_gui"] = _discard(gui.close) ps = cast(ProxyScene, self.mb.getScene()) UpdateProxyScene("update_proxy_scene", self.sp, ps) GilRelease("gil_release", self.sp) # Set the proxy_scene_id, so we can associate the two when saving to a DataStruct self._gui_graphics_ds.proxy_scene_id = ps.id() return self.gui else: cleanup_graphics, _ = self._gui_graphics_ds.setupGraphics(self.mb) # Register the cleanup function if we own the StatePropagator if "_discard_sp" in bc.at_exit_fns: bc.at_exit_fns.insertBefore( "_discard_sp", "_cleanup_graphics", _discard(cleanup_graphics) ) ps = cast(ProxyScene, self.mb.getScene()) UpdateProxyScene("update_proxy_scene", self.sp, ps) # Set the proxy_scene_id, so we can associate the two when saving to a DataStruct self._gui_graphics_ds.proxy_scene_id = ps.id() return cleanup_graphics, _
[docs] class SimDS(DataStruct): """DataStruct representation of a simulation. Parameters ---------- sp_ds : StatePropagatorDS The StatePropagator DataStruct. mb_ds: SubGraphDS The Multibody DataStruct. setup_graphics: GuiSetupDS | GraphicsSetupDS | None The GUI or graphics setup DataStruct. collision: list[SerializeAsAny[CollisionSceneDS[CollisionSceneType]]] = [] # pyright: ignore The CollisionScene DataStructs. prefabs: list[SerializeAsAny[PrefabDS[ConfigType, ContextType, ParamsType]]] # pyright: ignore The Prefab DataStructs. """ sp_ds: StatePropagatorDS mb_ds: SubGraphDS setup_graphics: Optional[GuiSetupDS | GraphicsSetupDS] = Field( discriminator="graphics_type", default=None ) collision: list[SerializeAsAny[CollisionSceneDS[CollisionSceneType]]] = [] # pyright: ignore prefabs: list[SerializeAsAny[PrefabDS[ConfigType, ContextType, ParamsType]]] # pyright: ignore @model_validator(mode="after") def _checkSceneConsistencty(self) -> "SimDS": """Validate that setup_graphics exist if collision scenes are present.""" if len(self.collision) > 0 and self.setup_graphics is None: raise ValueError("You must have setup_graphics set if collision scenes exist on SimDS.") return self
[docs] def toSim( self, ) -> Sim: """Create a simulation from this SimDS. Returns ------- Sim The Sim created from this SimDS. """ fc = FrameContainer("root") bc.at_exit_fns["_discard_fc"] = _discard(fc) mb = self.mb_ds.toMultibody(fc) bc.at_exit_fns["_discard_mb"] = _discard(mb) mb.ensureHealthy() if self.setup_graphics is not None: if isinstance(self.setup_graphics, GraphicsSetupDS): cleanup_graphics, _ = self.setup_graphics.setupGraphics(mb) else: cleanup_graphics, _ = self.setup_graphics.setupGraphicsOnly(mb) bc.at_exit_fns["_cleanup_graphics"] = _discard(cleanup_graphics) # Create a DS of ProxyScnene to track it in the system for other models that use it. proxy_ds = ProxySceneDS.fromProxyScene(cast(ProxyScene, mb.getScene())) if self.setup_graphics.proxy_scene_id is not None and proxy_ds._id is not None: # Reconfigure the IDs to match what they were previously for bookkeeping purposes proxy_ds._all_objects_from_ids[self.setup_graphics.proxy_scene_id] = ( proxy_ds._all_objects_from_ids.pop(proxy_ds._id) ) proxy_ds._id = self.setup_graphics.proxy_scene_id p = cast(ProxyScene, mb.getScene()) if len(self.collision) > 0: if p is None: raise ValueError("Cannot register collision scenes without a proxy scene.") for c in self.collision: sc = c.toCollisionScene() p.registerClientScene(sc, cast(Frame, mb.virtualRoot())) sp = self.sp_ds.toStatePropagator(mb) bc.at_exit_fns["_discard_sp"] = _discard(sp) sp.setTime(0.0) sp.setState(sp.assembleState()) s = Sim(mb, sp) if isinstance(self.setup_graphics, GuiSetupDS): self.setup_graphics.sp = sp gui = self.setup_graphics.setupGui(mb, s) # pyright: ignore s.gui = gui bc.at_exit_fns["_cleanup_gui"] = _discard(gui.close) s._gui_graphics_ds = self.setup_graphics for x in self.prefabs: x.toPrefab(s.prefab_root) return s