# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Sim class and associated classes/functions."""
from typing import TYPE_CHECKING, Callable, Literal, Optional, cast, overload
from Karana.Dynamics.SOADyn_types import (
MultibodyWebUI,
StatePropagatorDS,
SubGraphDS,
GraphicsSetupDS,
GuiSetupDS,
)
from Karana.Dynamics import Multibody, StatePropagator, StickPartsConfig
from Karana.Frame import FrameContainer, Frame
from Karana.Core import BaseContainer, BaseWithVars, CppWeakRef, discard, Base, warn
from Karana.Integrators import IntegratorType
from Karana.Models import GilRelease, UpdateProxyScene
from Karana.Scene import CollisionScene, ProxyScene, WebScene
from Karana.Scene.ProxyScene_types import ProxySceneDS
from Karana.Scene.Scene_types import CollisionSceneDS, CollisionSceneType
from pydantic import Field, SerializeAsAny, model_validator
if TYPE_CHECKING:
from Karana.KUtils.DataStruct import DataStruct
from Karana.KUtils.Prefab import (
Prefab,
ConfigType,
ContextType,
ParamsType,
PrefabDS,
Config,
Context,
Params,
)
from Karana.KUtils.BasicPrefab import BasicPrefabDS, BasicPrefab, ExtraInfoType
else:
from .DataStruct import DataStruct
from .Prefab import (
Prefab,
ConfigType,
ContextType,
ParamsType,
PrefabDS,
Config,
Context,
Params,
)
from .BasicPrefab import BasicPrefabDS, BasicPrefab, ExtraInfoType
bc = BaseContainer.singleton()
@overload
def _discard(callback: Callable[[], None]) -> Callable[[], None]:
"""Create a discard method that uses a callback.
This method ignores cleanup errors if not in developer mode. Users typically don't need
to worry about this, as the OS will tear down things for them. If they do need to worry about it,
e.g., they are creating multiple simulations in the same interpreter, then then can use
allDestroyed to check this.
Parameters
----------
callback: Callable[[], None]
The callback to wrap in a try/except if not in developer mode.
Returns
-------
Callable[[],None]
The discard function that uses try/except if appropriate.
"""
...
@overload
def _discard(obj: Base) -> Callable[[], None]:
"""Create a discard method that uses a CppWeakRef.
This method also ignores cleanup errors if not in developer mode. Users typically don't need
to worry about this, as the OS will tear down things for them. If they do need to worry about it,
e.g., they are creating multiple simulations in the same interpreter, then then can use
allDestroyed to check this.
Parameters
----------
obj : Base
The object to discard.
Returns
-------
Callable[[],None]
The discard function that uses a CppWeakRef.
"""
...
def _discard(*args, **_):
"""Create a discard method that uses a CppWeakRef.
This method also ignores cleanup errors if not in developer mode. Users typically don't need
to worry about this, as the OS will tear down things for them. If they do need to worry about it,
e.g., they are creating multiple simulations in the same interpreter, then then can use
allDestroyed to check this.
Parameters
----------
obj : Base
The object to discard.
Returns
-------
Callable[[],None]
The discard function that uses a CppWeakRef.
"""
obj = args[0]
if isinstance(obj, Base):
# Create a CppWeakRef to the object
obj_ref = CppWeakRef(obj)
def inner():
nonlocal obj_ref
obj = obj_ref()
if obj is not None:
if hasattr(bc, "check_all_destroyed"):
# We are in a developer build. Run normal discard
discard(obj)
else:
# We are in a production build. Run this in a try/except and ignore errors.
# Users can check_all_destroyed if they really want to, to ensure cleanup is done
try:
discard(obj)
except:
pass
return inner
else:
# obj is already a callable
def inner():
nonlocal obj
if hasattr(bc, "check_all_destroyed"):
# We are in a developer build. Run normal discard
obj()
else:
# We are in a production build. Run this in a try/except and ignore errors.
# Users can check_all_destroyed if they really want to, to ensure cleanup is done
try:
obj()
except:
pass
return inner
[docs]
class PrefabRoot(Prefab[Config, Context, Params]):
"""Prefab to act as the virtual root of the Prefab hierarchy."""
pass
[docs]
class Sim(BaseWithVars):
"""Simulation class.
This class helps one easily setup a simulation complete with a
Multibody and StatePropagator.
It contains helper methods to do common tasks like setup graphics or the GUI.
"""
@overload
def __init__(self):
"""Create a new instance of Sim.
An empty Multibody and StatePropagator will be created.
"""
...
@overload
def __init__(self, mb_ds: SubGraphDS | None, sp_ds: StatePropagatorDS | None):
"""Create a new instance of Sim.
Parameters
----------
mb_ds : SubGraphDS | None
The SubGraphDS to use to create the Multibody. If None, then
an empty Multibody will be created.
sp_ds : StatePropagatorDS
The StatePropagatorDS to use to create the StatePropagator.
If None, then a default StatePropagator will be created.
"""
...
@overload
def __init__(self, mb: Multibody, sp: StatePropagator):
"""Create a new instance of Sim.
This version will not register cleanup functions, as it assumes whoever created the Multibody
and StatePropagator has already done that.
Parameters
----------
mb : Multibody
The Multibody to use for this simulation.
sp : StatePropagator
The StatePropagator to use for this simulation.
"""
...
def __init__(self, *args, **_):
"""Create a new instance of Sim."""
super().__init__("sim")
# Set _gui_graphics_ds to None to indicate no GUI nor graphics have been set up yet
self._gui_graphics_ds = None
if len(args) == 2:
if isinstance(args[0], Multibody):
self.mb: Multibody = args[0]
self.sp: StatePropagator = args[1]
self.fc = self.mb.frameContainer()
else:
self.fc = FrameContainer("root")
bc.at_exit_fns["_discard_fc"] = _discard(self.fc)
mb_ds: SubGraphDS | None = args[0]
sp_ds: StatePropagatorDS | None = args[1]
if mb_ds is not None:
self.mb = mb_ds.toMultibody(self.fc)
else:
self.mb = Multibody("mb", self.fc)
bc.at_exit_fns["_discard_mb"] = _discard(self.mb)
self.mb.ensureHealthy()
if sp_ds is not None:
self.sp = sp_ds.toStatePropagator(self.mb)
else:
self.sp = StatePropagator(self.mb, IntegratorType.CVODE)
bc.at_exit_fns["_discard_sp"] = _discard(self.sp)
self.sp.setTime(0.0)
else:
self.fc = FrameContainer("root")
bc.at_exit_fns["_discard_fc"] = _discard(self.fc)
self.mb = Multibody("multibody", self.fc)
bc.at_exit_fns["_discard_mb"] = _discard(self.mb)
self.mb.ensureHealthy()
self.sp = StatePropagator(self.mb, IntegratorType.CVODE)
bc.at_exit_fns["_discard_sp"] = _discard(self.sp)
self.sp.setTime(0.0)
self.sp.setState(self.sp.assembleState())
self.prefab_root = PrefabRoot("prefab_root", Config(), Context(), Params())
[docs]
@classmethod
def fromDS(cls, sim_ds: "SimDS") -> "Sim":
"""Create a Sim instance from its associated DataStruct.
Parameters
----------
sim_ds : SimDS
The DataStruct to create the Sim from.
Returns
-------
Sim
A new instance of Sim created from the provided DataStruct.
"""
return sim_ds.toSim()
[docs]
def toDS(self) -> "SimDS":
"""Convert this Sim into a SimDS.
Returns
-------
SimDS
A DataStruct representation of this simulation.
"""
ps = self.mb.getScene()
collision_scenes = []
if ps is not None:
for s in ps.clientScenes():
if isinstance(s, CollisionScene):
collision_scenes.append(s.toDS())
return SimDS(
mb_ds=self.mb.toDS(),
sp_ds=self.sp.toDS(),
setup_graphics=self._gui_graphics_ds,
collision=collision_scenes,
prefabs=[p.toDS() for p in self.prefab_root._children],
)
[docs]
@classmethod
def fromBasicPrefabDS(
cls, basic_prefab_ds: BasicPrefabDS[ExtraInfoType]
) -> tuple["Sim", BasicPrefab[ExtraInfoType]]:
"""Create a Sim instance from its associated DataStruct.
Parameters
----------
basic_prefab_ds : BasicPrefabDS
The BasicPrefabDS to create a Simulation from.
Returns
-------
tuple[Sim, BasicPrefab[ExtraInfoType]]
A new instance of Sim created from the provided BasicPrefabDS, and the
associated BasicPrefab.
"""
fc = FrameContainer("root")
bc.at_exit_fns["_discard_fc"] = _discard(fc)
bp, sp = BasicPrefab[ExtraInfoType].createStandalone(basic_prefab_ds, fc)
mb = sp.getSubTree().multibody()
bc.at_exit_fns["_discard_mb"] = _discard(mb)
bc.at_exit_fns["_discard_sp"] = _discard(sp)
return Sim(mb, sp), bp
[docs]
def setupGraphics(
self,
*,
port: int = 29523,
axes: float = 1.0,
client_type: Literal["auto", "electron", "notebook", "webbrowser", "selenium"]
| None = "auto",
origin_frame: Frame | None = None,
wait_for_clients: int = 0,
wait_for_clients_timeout: float = 0.0,
stick_parts: Literal["auto", "always", "never"] = "auto",
) -> tuple[Callable[[], None], WebScene]:
"""Set up the graphics for this simulation.
Parameters
----------
port : int = 29523
Port to bind the WebUI server to. Use 0 to request an arbitrary unused
port.
axes : float = 1.0
Length of axes visualization on root frame.
client_type : Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None
Policy for launching a client.
"auto": pick the best option for the environment and OS
"electron": always launch the electron client
"notebook": always open an IFrame in IPython
"webbrowser": always open a browser tab
"selenium": always open a chrome driver using selenium
None: don't automatically open a client
Defaults to "auto".
origin_frame: Optional[Frame] = None
Frame to use as the world origin for the graphics scene. If None, use
the Multibody's virtualRoot.
wait_for_clients: int = 0
Number of client connections to wait for before continuing.
wait_for_clients_timeout : float = 0.0
Number of seconds to wait before raising an error if wait_for_clients
is positive.
stick_parts : Literal["auto", "always", "never"] = "auto"
Policy for creating stick parts. Defaults to "auto".
"auto": create stick parts if the ProxyScene is empty
"always": unconditionally create stick parts
"never": unconditionally don't create stick parts
Returns
-------
tuple[Callable, WebScene]
A tuple containing the a cleanup callable and the graphics scene.
"""
return self._setupGuiGraphics(
GraphicsSetupDS(
port=port,
axes=axes,
client_type=client_type,
origin_frame=origin_frame,
wait_for_clients=wait_for_clients,
wait_for_clients_timeout=wait_for_clients_timeout,
stick_parts=stick_parts,
)
)
[docs]
def setupGui(
self,
port=29534,
client_type: Literal["auto", "electron", "notebook", "webbrowser", "selenium"]
| None = "auto",
wait_for_clients: int = 0,
wait_for_clients_timeout: float = 0.0,
stick_parts: Literal["auto", "always", "never"] = "auto",
stick_parts_config: StickPartsConfig | None = None,
name_to_label_map: dict[str, str] | None = None,
time_display_period: float | None = 0.01,
graphics_origin_frame: Frame | None = None,
) -> MultibodyWebUI:
"""Create the GUI for this multibody system.
Parameters
----------
self : Sim
The Sim instance
port : int
Port to bind the WebUI server to. Use 0 to request an arbitrary unused
port. Defaults to 29534.
client_type : Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None
Policy for launching a client.
"auto": pick the best option for the environment and OS
"electron": always launch the electron client
"notebook": always open an IFrame in IPython
"webbrowser": always open a browser tab
"selenium": always open a chrome driver using selenium
None: don't automatically open a client
Defaults to "auto".
wait_for_clients: int
Number of client connections to wait for before continuing.
Defaults to 0.
wait_for_clients_timeout : float
Number of seconds to wait before raising an error if wait_for_clients
is positive. Defaults to 0.
graphics_origin_frame : Optional[Frame]
If given, and the GUI needs to setup graphics, use this as the
origin frame instead of the multibody virtual root.
stick_parts : Literal["auto", "always", "never"] = "auto"
Policy for creating stick parts. Defaults to "auto".
"auto": create stick parts if the ProxyScene is empty
"always": unconditionally create stick parts
"never": unconditionally don't create stick parts
stick_parts_config : StickPartsConfig | None
Configuration parameters for the stick parts.
name_to_label_map: dict[str, str] | None
Dictionary defining the labels to use for each body in the visjs graphs
display
time_display_period : float | None
If not None and sp is given, add a time display refreshing
with this period in simulation time. Defaults to 0.01.
"""
gui = self._setupGuiGraphics(
GuiSetupDS(
port=port,
client_type=client_type,
wait_for_clients=wait_for_clients,
wait_for_clients_timeout=wait_for_clients_timeout,
stick_parts=stick_parts,
stick_parts_config=stick_parts_config,
name_to_label_map=name_to_label_map,
sp=self.sp,
sim=self, # pyright: ignore
time_display_period=time_display_period,
graphics_origin_frame=graphics_origin_frame,
)
)
return gui
@overload
def _setupGuiGraphics(self, setup: GraphicsSetupDS) -> tuple[Callable[[], None], WebScene]:
"""Set up graphics using the provided DataStruct.
Parameters
----------
setup : GraphicsSetupDS
The options for setting up the graphics.
Returns
-------
tuple[Callable[[], None], WebScene]
The cleanup callback and WebScene.
"""
...
@overload
def _setupGuiGraphics(self, setup: GuiSetupDS) -> MultibodyWebUI:
"""Set up the GUI using the provided DataStruct.
Parameters
----------
setup : GuiSetupDS
The options for setting up the GUI.
Returns
-------
MultibodyWebUI
The GUI.
"""
...
def _setupGuiGraphics(self, *args, **_):
"""Set up the GUI or graphics.
See overloads for details.
"""
gui_graphics = cast(GuiSetupDS | GraphicsSetupDS, args[0])
if self._gui_graphics_ds is not None:
if hasattr(self, "gui"):
warn("The GUI is already set up. Setting up again is rarely intended.")
elif isinstance(gui_graphics, GraphicsSetupDS):
warn("Graphics was already set up. Setting it up again is rarely intended.")
gui_only = False
if isinstance(self._gui_graphics_ds, GraphicsSetupDS) and not hasattr(self, "gui"):
gui_only = True
if self._gui_graphics_ds is not None:
gui_graphics.proxy_scene_id = self._gui_graphics_ds.proxy_scene_id
self._gui_graphics_ds = gui_graphics
if isinstance(self._gui_graphics_ds, GuiSetupDS):
if gui_only:
# If running GUI only, someone already set up the graphics. Just setup the gui and
# register its callback.
gui = self._gui_graphics_ds.setupGui(self.mb, self) # pyright: ignore
self.gui = gui
GilRelease("gil_release", self.sp)
bc.at_exit_fns["_cleanup_gui"] = _discard(gui.close)
return gui
else:
# Set up the GUI and graphics. Separate the two for cleanup purposes.
gui, cleanup_graphics, _ = self._gui_graphics_ds.setupGui( # pyright: ignore
self.mb,
self, # pyright: ignore
setup_graphics_first=True,
)
self.gui = gui
# Register these cleanup functions if we own the StatePropagator
if "_discard_sp" in bc.at_exit_fns:
bc.at_exit_fns.insertBefore(
"_discard_sp", "_cleanup_graphics", _discard(cleanup_graphics)
)
bc.at_exit_fns["_cleanup_gui"] = _discard(gui.close)
ps = cast(ProxyScene, self.mb.getScene())
UpdateProxyScene("update_proxy_scene", self.sp, ps)
GilRelease("gil_release", self.sp)
# Set the proxy_scene_id, so we can associate the two when saving to a DataStruct
self._gui_graphics_ds.proxy_scene_id = ps.id()
return self.gui
else:
cleanup_graphics, _ = self._gui_graphics_ds.setupGraphics(self.mb)
# Register the cleanup function if we own the StatePropagator
if "_discard_sp" in bc.at_exit_fns:
bc.at_exit_fns.insertBefore(
"_discard_sp", "_cleanup_graphics", _discard(cleanup_graphics)
)
ps = cast(ProxyScene, self.mb.getScene())
UpdateProxyScene("update_proxy_scene", self.sp, ps)
# Set the proxy_scene_id, so we can associate the two when saving to a DataStruct
self._gui_graphics_ds.proxy_scene_id = ps.id()
return cleanup_graphics, _
[docs]
class SimDS(DataStruct):
"""DataStruct representation of a simulation.
Parameters
----------
sp_ds : StatePropagatorDS
The StatePropagator DataStruct.
mb_ds: SubGraphDS
The Multibody DataStruct.
setup_graphics: GuiSetupDS | GraphicsSetupDS | None
The GUI or graphics setup DataStruct.
collision: list[SerializeAsAny[CollisionSceneDS[CollisionSceneType]]] = [] # pyright: ignore
The CollisionScene DataStructs.
prefabs: list[SerializeAsAny[PrefabDS[ConfigType, ContextType, ParamsType]]] # pyright: ignore
The Prefab DataStructs.
"""
sp_ds: StatePropagatorDS
mb_ds: SubGraphDS
setup_graphics: Optional[GuiSetupDS | GraphicsSetupDS] = Field(
discriminator="graphics_type", default=None
)
collision: list[SerializeAsAny[CollisionSceneDS[CollisionSceneType]]] = [] # pyright: ignore
prefabs: list[SerializeAsAny[PrefabDS[ConfigType, ContextType, ParamsType]]] # pyright: ignore
@model_validator(mode="after")
def _checkSceneConsistencty(self) -> "SimDS":
"""Validate that setup_graphics exist if collision scenes are present."""
if len(self.collision) > 0 and self.setup_graphics is None:
raise ValueError("You must have setup_graphics set if collision scenes exist on SimDS.")
return self
[docs]
def toSim(
self,
) -> Sim:
"""Create a simulation from this SimDS.
Returns
-------
Sim
The Sim created from this SimDS.
"""
fc = FrameContainer("root")
bc.at_exit_fns["_discard_fc"] = _discard(fc)
mb = self.mb_ds.toMultibody(fc)
bc.at_exit_fns["_discard_mb"] = _discard(mb)
mb.ensureHealthy()
if self.setup_graphics is not None:
if isinstance(self.setup_graphics, GraphicsSetupDS):
cleanup_graphics, _ = self.setup_graphics.setupGraphics(mb)
else:
cleanup_graphics, _ = self.setup_graphics.setupGraphicsOnly(mb)
bc.at_exit_fns["_cleanup_graphics"] = _discard(cleanup_graphics)
# Create a DS of ProxyScnene to track it in the system for other models that use it.
proxy_ds = ProxySceneDS.fromProxyScene(cast(ProxyScene, mb.getScene()))
if self.setup_graphics.proxy_scene_id is not None and proxy_ds._id is not None:
# Reconfigure the IDs to match what they were previously for bookkeeping purposes
proxy_ds._all_objects_from_ids[self.setup_graphics.proxy_scene_id] = (
proxy_ds._all_objects_from_ids.pop(proxy_ds._id)
)
proxy_ds._id = self.setup_graphics.proxy_scene_id
p = cast(ProxyScene, mb.getScene())
if len(self.collision) > 0:
if p is None:
raise ValueError("Cannot register collision scenes without a proxy scene.")
for c in self.collision:
sc = c.toCollisionScene()
p.registerClientScene(sc, cast(Frame, mb.virtualRoot()))
sp = self.sp_ds.toStatePropagator(mb)
bc.at_exit_fns["_discard_sp"] = _discard(sp)
sp.setTime(0.0)
sp.setState(sp.assembleState())
s = Sim(mb, sp)
if isinstance(self.setup_graphics, GuiSetupDS):
self.setup_graphics.sp = sp
gui = self.setup_graphics.setupGui(mb, s) # pyright: ignore
s.gui = gui
bc.at_exit_fns["_cleanup_gui"] = _discard(gui.close)
s._gui_graphics_ds = self.setup_graphics
for x in self.prefabs:
x.toPrefab(s.prefab_root)
return s