# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Sim class and associated classes/functions."""
import h5py
import textwrap
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Literal, Optional, cast, overload, IO
from Karana.Dynamics.SOADyn_types import (
MultibodyWebUI,
StatePropagatorDS,
SubGraphDS,
GraphicsSetupDS,
GuiSetupDS,
)
from Karana.Dynamics import Multibody, StatePropagator, StickPartsConfig
from Karana.Frame import FrameContainer, Frame
from Karana.Core import BaseContainer, BaseWithVars, CppWeakRef, discard, Base, warn
from Karana.Integrators import IntegratorType
import Karana.Models as kmdl
import Karana.WebUI as kw
from Karana.Scene import CollisionScene, ProxyScene, WebScene
from Karana.Scene.ProxyScene_types import ProxySceneDS
from Karana.Scene.Scene_types import CollisionSceneDS, CollisionSceneType
from pydantic import Field, SerializeAsAny, model_validator
if TYPE_CHECKING:
from Karana.KUtils.DataStruct import DataStruct, Meta
from Karana.KUtils._toPythonScript import VarPythonScriptContext, writePythonScript
from Karana.KUtils.Prefab import (
Prefab,
ConfigType,
ContextType,
ParamsType,
PrefabDS,
Config,
Context,
Params,
)
from Karana.KUtils.BasicPrefab import BasicPrefabDS, BasicPrefab, ExtraInfoType
from Karana.KUtils.DataPlotter import SinglePlotData, DashApp
else:
from .DataStruct import DataStruct, Meta
from ._toPythonScript import VarPythonScriptContext, writePythonScript
from .Prefab import (
Prefab,
ConfigType,
ContextType,
ParamsType,
PrefabDS,
Config,
Context,
Params,
)
from .BasicPrefab import BasicPrefabDS, BasicPrefab, ExtraInfoType
from .DataPlotter import SinglePlotData, DashApp
bc = BaseContainer.singleton()
@overload
def _discard(callback: Callable[[], None]) -> Callable[[], None]:
"""Create a discard method that uses a callback.
This method ignores cleanup errors if not in developer mode. Users typically don't need
to worry about this, as the OS will tear down things for them. If they do need to worry about it,
e.g., they are creating multiple simulations in the same interpreter, then then can use
allDestroyed to check this.
Parameters
----------
callback: Callable[[], None]
The callback to wrap in a try/except if not in developer mode.
Returns
-------
Callable[[],None]
The discard function that uses try/except if appropriate.
"""
...
@overload
def _discard(obj: Base) -> Callable[[], None]:
"""Create a discard method that uses a CppWeakRef.
This method also ignores cleanup errors if not in developer mode. Users typically don't need
to worry about this, as the OS will tear down things for them. If they do need to worry about it,
e.g., they are creating multiple simulations in the same interpreter, then then can use
allDestroyed to check this.
Parameters
----------
obj : Base
The object to discard.
Returns
-------
Callable[[],None]
The discard function that uses a CppWeakRef.
"""
...
def _discard(*args, **_):
"""Create a discard method that uses a CppWeakRef.
This method also ignores cleanup errors if not in developer mode. Users typically don't need
to worry about this, as the OS will tear down things for them. If they do need to worry about it,
e.g., they are creating multiple simulations in the same interpreter, then then can use
allDestroyed to check this.
Parameters
----------
obj : Base
The object to discard.
Returns
-------
Callable[[],None]
The discard function that uses a CppWeakRef.
"""
obj = args[0]
if isinstance(obj, Base):
# Create a CppWeakRef to the object
obj_ref = CppWeakRef(obj)
def inner():
nonlocal obj_ref
obj = obj_ref()
if obj is not None:
if hasattr(bc, "check_all_destroyed"):
# We are in a developer build. Run normal discard
discard(obj)
else:
# We are in a production build. Run this in a try/except and ignore errors.
# Users can check_all_destroyed if they really want to, to ensure cleanup is done
try:
discard(obj)
except:
pass
return inner
else:
# obj is already a callable
def inner():
nonlocal obj
if hasattr(bc, "check_all_destroyed"):
# We are in a developer build. Run normal discard
obj()
else:
# We are in a production build. Run this in a try/except and ignore errors.
# Users can check_all_destroyed if they really want to, to ensure cleanup is done
try:
obj()
except:
pass
return inner
[docs]
class PrefabRoot(Prefab[Config, Context, Params]):
"""Prefab to act as the virtual root of the Prefab hierarchy."""
pass
[docs]
class Sim(BaseWithVars):
"""Simulation class.
This class helps one easily setup a simulation complete with a
Multibody and StatePropagator.
It contains helper methods to do common tasks like setup graphics or the GUI.
"""
@overload
def __init__(self):
"""Create a new instance of Sim.
An empty Multibody and StatePropagator will be created.
"""
...
@overload
def __init__(self, mb_ds: SubGraphDS | None, sp_ds: StatePropagatorDS | None):
"""Create a new instance of Sim.
Parameters
----------
mb_ds : SubGraphDS | None
The SubGraphDS to use to create the Multibody. If None, then
an empty Multibody will be created.
sp_ds : StatePropagatorDS
The StatePropagatorDS to use to create the StatePropagator.
If None, then a default StatePropagator will be created.
"""
...
@overload
def __init__(self, mb: Multibody, sp: StatePropagator):
"""Create a new instance of Sim.
This version will not register cleanup functions, as it assumes whoever created the Multibody
and StatePropagator has already done that.
Parameters
----------
mb : Multibody
The Multibody to use for this simulation.
sp : StatePropagator
The StatePropagator to use for this simulation.
"""
...
def __init__(self, *args, **_):
"""Create a new instance of Sim."""
super().__init__("sim")
# Set _gui_graphics_ds to None to indicate no GUI nor graphics have been set up yet
self._gui_graphics_ds = None
if len(args) == 2:
if isinstance(args[0], Multibody):
self.mb: Multibody = args[0]
self.sp: StatePropagator = args[1]
self.fc = self.mb.frameContainer()
else:
self.fc = FrameContainer("root")
bc.at_exit_fns[f"_discard_fc_{self.fc.id()}"] = _discard(self.fc)
mb_ds: SubGraphDS | None = args[0]
sp_ds: StatePropagatorDS | None = args[1]
if mb_ds is not None:
self.mb = mb_ds.toMultibody(self.fc)
else:
self.mb = Multibody("mb", self.fc)
bc.at_exit_fns[f"_discard_mb_{self.mb.id()}"] = _discard(self.mb)
self.mb.ensureHealthy()
if sp_ds is not None:
self.sp = sp_ds.toStatePropagator(self.mb)
else:
self.sp = StatePropagator(self.mb, IntegratorType.CVODE)
bc.at_exit_fns[f"_discard_sp_{self.sp.id()}"] = _discard(self.sp)
self.sp.setTime(0.0)
else:
self.fc = FrameContainer("root")
bc.at_exit_fns[f"_discard_fc_{self.fc.id()}"] = _discard(self.fc)
self.mb = Multibody("multibody", self.fc)
bc.at_exit_fns[f"_discard_mb_{self.mb.id()}"] = _discard(self.mb)
self.mb.ensureHealthy()
self.sp = StatePropagator(self.mb, IntegratorType.CVODE)
bc.at_exit_fns[f"_discard_sp_{self.sp.id()}"] = _discard(self.sp)
self.sp.setTime(0.0)
self.sp.setState(self.sp.assembleState())
self.prefab_root = PrefabRoot("prefab_root", Config(), Context(), Params())
[docs]
@classmethod
def fromDS(cls, sim_ds: "SimDS") -> "Sim":
"""Create a Sim instance from its associated DataStruct.
Parameters
----------
sim_ds : SimDS
The DataStruct to create the Sim from.
Returns
-------
Sim
A new instance of Sim created from the provided DataStruct.
"""
return sim_ds.toSim()
[docs]
def toDS(self) -> "SimDS":
"""Convert this Sim into a SimDS.
Returns
-------
SimDS
A DataStruct representation of this simulation.
"""
ps = self.mb.getScene()
collision_scenes = []
if ps is not None:
for s in ps.clientScenes():
if isinstance(s, CollisionScene):
collision_scenes.append(s.toDS())
return SimDS(
mb_ds=self.mb.toDS(),
sp_ds=self.sp.toDS(),
setup_graphics=self._gui_graphics_ds,
collision=collision_scenes,
prefabs=[p.toDS() for p in self.prefab_root._children],
)
[docs]
@classmethod
def fromBasicPrefabDS(
cls, basic_prefab_ds: BasicPrefabDS[ExtraInfoType]
) -> tuple["Sim", BasicPrefab[ExtraInfoType]]:
"""Create a Sim instance from its associated DataStruct.
Parameters
----------
basic_prefab_ds : BasicPrefabDS
The BasicPrefabDS to create a Simulation from.
Returns
-------
tuple[Sim, BasicPrefab[ExtraInfoType]]
A new instance of Sim created from the provided BasicPrefabDS, and the
associated BasicPrefab.
"""
fc = FrameContainer("root")
bc.at_exit_fns[f"_discard_fc_{fc.id()}"] = _discard(fc)
bp, sp = BasicPrefab[ExtraInfoType].createStandalone(basic_prefab_ds, fc)
mb = sp.getSubTree().multibody()
bc.at_exit_fns[f"_discard_mb_{mb.id()}"] = _discard(mb)
bc.at_exit_fns[f"_discard_sp_{sp.id()}"] = _discard(sp)
return Sim(mb, sp), bp
[docs]
def setupGraphics(
self,
*,
port: int = 29523,
axes: float = 1.0,
client_type: Literal["auto", "electron", "notebook", "webbrowser", "selenium"]
| None = "auto",
origin_frame: Frame | None = None,
wait_for_clients: int = 0,
wait_for_clients_timeout: float = 0.0,
stick_parts: Literal["auto", "always", "never"] = "auto",
) -> tuple[Callable[[], None], WebScene]:
"""Set up the graphics for this simulation.
Parameters
----------
port : int = 29523
Port to bind the WebUI server to. Use 0 to request an arbitrary unused
port.
axes : float = 1.0
Length of axes visualization on root frame.
client_type : Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None
Policy for launching a client.
"auto": pick the best option for the environment and OS
"electron": always launch the electron client
"notebook": always open an IFrame in IPython
"webbrowser": always open a browser tab
"selenium": always open a chrome driver using selenium
None: don't automatically open a client
Defaults to "auto".
origin_frame: Optional[Frame] = None
Frame to use as the world origin for the graphics scene. If None, use
the Multibody's virtualRoot.
wait_for_clients: int = 0
Number of client connections to wait for before continuing.
wait_for_clients_timeout : float = 0.0
Number of seconds to wait before raising an error if wait_for_clients
is positive.
stick_parts : Literal["auto", "always", "never"] = "auto"
Policy for creating stick parts. Defaults to "auto".
"auto": create stick parts if the ProxyScene is empty
"always": unconditionally create stick parts
"never": unconditionally don't create stick parts
Returns
-------
tuple[Callable, WebScene]
A tuple containing the a cleanup callable and the graphics scene.
"""
return self._setupGuiGraphics(
GraphicsSetupDS(
port=port,
axes=axes,
client_type=client_type,
origin_frame=origin_frame,
wait_for_clients=wait_for_clients,
wait_for_clients_timeout=wait_for_clients_timeout,
stick_parts=stick_parts,
)
)
[docs]
def setupGui(
self,
port=29534,
client_type: Literal["auto", "electron", "notebook", "webbrowser", "selenium"]
| None = "auto",
wait_for_clients: int = 0,
wait_for_clients_timeout: float = 0.0,
stick_parts: Literal["auto", "always", "never"] = "auto",
stick_parts_config: StickPartsConfig | None = None,
name_to_label_map: dict[str, str] | None = None,
time_display_period: float | None = 0.01,
graphics_origin_frame: Frame | None = None,
) -> MultibodyWebUI:
"""Create the GUI for this multibody system.
Parameters
----------
self : Sim
The Sim instance
port : int
Port to bind the WebUI server to. Use 0 to request an arbitrary unused
port. Defaults to 29534.
client_type : Literal["auto", "electron", "notebook", "webbrowser", "selenium"] | None
Policy for launching a client.
"auto": pick the best option for the environment and OS
"electron": always launch the electron client
"notebook": always open an IFrame in IPython
"webbrowser": always open a browser tab
"selenium": always open a chrome driver using selenium
None: don't automatically open a client
Defaults to "auto".
wait_for_clients: int
Number of client connections to wait for before continuing.
Defaults to 0.
wait_for_clients_timeout : float
Number of seconds to wait before raising an error if wait_for_clients
is positive. Defaults to 0.
graphics_origin_frame : Optional[Frame]
If given, and the GUI needs to setup graphics, use this as the
origin frame instead of the multibody virtual root.
stick_parts : Literal["auto", "always", "never"] = "auto"
Policy for creating stick parts. Defaults to "auto".
"auto": create stick parts if the ProxyScene is empty
"always": unconditionally create stick parts
"never": unconditionally don't create stick parts
stick_parts_config : StickPartsConfig | None
Configuration parameters for the stick parts.
name_to_label_map: dict[str, str] | None
Dictionary defining the labels to use for each body in the visjs graphs
display
time_display_period : float | None
If not None and sp is given, add a time display refreshing
with this period in simulation time. Defaults to 0.01.
"""
gui = self._setupGuiGraphics(
GuiSetupDS(
port=port,
client_type=client_type,
wait_for_clients=wait_for_clients,
wait_for_clients_timeout=wait_for_clients_timeout,
stick_parts=stick_parts,
stick_parts_config=stick_parts_config,
name_to_label_map=name_to_label_map,
sp=self.sp,
sim=self, # pyright: ignore
time_display_period=time_display_period,
graphics_origin_frame=graphics_origin_frame,
)
)
return gui
@overload
def _setupGuiGraphics(self, setup: GraphicsSetupDS) -> tuple[Callable[[], None], WebScene]:
"""Set up graphics using the provided DataStruct.
Parameters
----------
setup : GraphicsSetupDS
The options for setting up the graphics.
Returns
-------
tuple[Callable[[], None], WebScene]
The cleanup callback and WebScene.
"""
...
@overload
def _setupGuiGraphics(self, setup: GuiSetupDS) -> MultibodyWebUI:
"""Set up the GUI using the provided DataStruct.
Parameters
----------
setup : GuiSetupDS
The options for setting up the GUI.
Returns
-------
MultibodyWebUI
The GUI.
"""
...
def _setupGuiGraphics(self, *args, **_):
"""Set up the GUI or graphics.
See overloads for details.
"""
gui_graphics = cast(GuiSetupDS | GraphicsSetupDS, args[0])
if self._gui_graphics_ds is not None:
if hasattr(self, "gui"):
warn("The GUI is already set up. Setting up again is rarely intended.")
elif isinstance(gui_graphics, GraphicsSetupDS):
warn("Graphics was already set up. Setting it up again is rarely intended.")
gui_only = False
if isinstance(self._gui_graphics_ds, GraphicsSetupDS) and not hasattr(self, "gui"):
gui_only = True
if self._gui_graphics_ds is not None:
gui_graphics.proxy_scene_id = self._gui_graphics_ds.proxy_scene_id
self._gui_graphics_ds = gui_graphics
if isinstance(self._gui_graphics_ds, GuiSetupDS):
if gui_only:
# If running GUI only, someone already set up the graphics. Just setup the gui and
# register its callback.
gui = self._gui_graphics_ds.setupGui(self.mb, self) # pyright: ignore
self.gui = gui
kmdl.GilRelease("gil_release", self.sp)
bc.at_exit_fns[f"_cleanup_gui_{id(gui)}"] = _discard(gui.close)
return gui
else:
# Set up the GUI and graphics. Separate the two for cleanup purposes.
gui, cleanup_graphics, _ = self._gui_graphics_ds.setupGui( # pyright: ignore
self.mb,
self, # pyright: ignore
setup_graphics_first=True,
)
self.gui = gui
# Register these cleanup functions if we own the StatePropagator
discard_sp_name = f"_discard_sp_{self.sp.id()}"
if discard_sp_name in bc.at_exit_fns:
ps = self.mb.getScene()
if ps is None:
raise ValueError("ProxyScene not created when it should have been.")
bc.at_exit_fns.insertBefore(
discard_sp_name, f"_cleanup_graphics_{ps.id()}", _discard(cleanup_graphics)
)
bc.at_exit_fns[f"_cleanup_gui_{id(gui)}"] = _discard(gui.close)
ps = cast(ProxyScene, self.mb.getScene())
# Add an UpdateProxyScene if we don't already have one
if not any(
isinstance(mdl, kmdl.UpdateProxyScene) for mdl in self.sp.getRegisteredModels()
):
kmdl.UpdateProxyScene("update_proxy_scene", self.sp, ps)
# Add a GilRelease if we don't already have one
if not any(
isinstance(mdl, kmdl.GilRelease) for mdl in self.sp.getRegisteredModels()
):
kmdl.GilRelease("gil_release", self.sp)
# Set the proxy_scene_id, so we can associate the two when saving to a DataStruct
self._gui_graphics_ds.proxy_scene_id = ps.id()
return self.gui
else:
cleanup_graphics, _ = self._gui_graphics_ds.setupGraphics(self.mb)
# Register the cleanup function if we own the StatePropagator
discard_sp_name = f"_discard_sp_{self.sp.id()}"
if discard_sp_name in bc.at_exit_fns:
ps = self.mb.getScene()
if ps is None:
raise ValueError("ProxyScene not created when it should have been.")
bc.at_exit_fns.insertBefore(
discard_sp_name, f"_cleanup_graphics_{ps.id()}", _discard(cleanup_graphics)
)
ps = cast(ProxyScene, self.mb.getScene())
# Add an UpdateProxyScene if we don't already have one
if not any(
isinstance(mdl, kmdl.UpdateProxyScene) for mdl in self.sp.getRegisteredModels()
):
kmdl.UpdateProxyScene("update_proxy_scene", self.sp, ps)
# Set the proxy_scene_id, so we can associate the two when saving to a DataStruct
self._gui_graphics_ds.proxy_scene_id = ps.id()
return cleanup_graphics, _
[docs]
def addPlots(self, plot_data: list[SinglePlotData], period: float, title: str = "plots"):
"""Add Plots specified by the SinglePlotData.
Parameters
----------
plot_data : list[SinglePlotData]
The data to plot.
period : float
The period to update the plots at.
title : str
The title of the dash app
"""
self._dash_app = DashApp(plot_data, title=title, port=0)
mdl = kmdl.DataPlotter("data_plotter", self.sp, self._dash_app)
mdl.setPeriod(period)
if hasattr(self, "gui"):
plot_iframe = kw.IFrame(self.gui.router, self._dash_app.url)
self.gui.dock.addChild("plots", plot_iframe, self.gui.info_panel.wroot, "within")
[docs]
class SimDS(DataStruct):
"""DataStruct representation of a simulation.
Parameters
----------
sp_ds : StatePropagatorDS
The StatePropagator DataStruct.
mb_ds: SubGraphDS
The Multibody DataStruct.
setup_graphics: GuiSetupDS | GraphicsSetupDS | None
The GUI or graphics setup DataStruct.
collision: list[SerializeAsAny[CollisionSceneDS[CollisionSceneType]]] = [] # pyright: ignore
The CollisionScene DataStructs.
prefabs: list[SerializeAsAny[PrefabDS[ConfigType, ContextType, ParamsType]]] # pyright: ignore
The Prefab DataStructs.
"""
sp_ds: StatePropagatorDS
mb_ds: SubGraphDS
setup_graphics: Optional[GuiSetupDS | GraphicsSetupDS] = Field(
discriminator="graphics_type", default=None
)
collision: list[SerializeAsAny[CollisionSceneDS[CollisionSceneType]]] = [] # pyright: ignore
prefabs: list[SerializeAsAny[PrefabDS[ConfigType, ContextType, ParamsType]]] # pyright: ignore
@model_validator(mode="after")
def _checkSceneConsistencty(self) -> "SimDS":
"""Validate that setup_graphics exist if collision scenes are present."""
if len(self.collision) > 0 and self.setup_graphics is None:
raise ValueError("You must have setup_graphics set if collision scenes exist on SimDS.")
return self
[docs]
def toSim(
self,
) -> Sim:
"""Create a simulation from this SimDS.
Returns
-------
Sim
The Sim created from this SimDS.
"""
fc = FrameContainer("root")
bc.at_exit_fns[f"_discard_fc_{fc.id()}"] = _discard(fc)
mb = self.mb_ds.toMultibody(fc)
bc.at_exit_fns[f"_discard_mb_{mb.id()}"] = _discard(mb)
mb.ensureHealthy()
if self.setup_graphics is not None:
if isinstance(self.setup_graphics, GraphicsSetupDS):
cleanup_graphics, _ = self.setup_graphics.setupGraphics(mb)
else:
cleanup_graphics, _ = self.setup_graphics.setupGraphicsOnly(mb, no_client=True)
ps = mb.getScene()
if ps is None:
raise ValueError("ProxyScene not created when it should have been.")
bc.at_exit_fns[f"_cleanup_graphics_{ps.id()}"] = _discard(cleanup_graphics)
# Create a DS of ProxyScnene to track it in the system for other models that use it.
proxy_ds = ProxySceneDS.fromProxyScene(cast(ProxyScene, mb.getScene()))
if self.setup_graphics.proxy_scene_id is not None and proxy_ds._id is not None:
# Reconfigure the IDs to match what they were previously for bookkeeping purposes
proxy_ds._all_objects_from_ids[self.setup_graphics.proxy_scene_id] = (
proxy_ds._all_objects_from_ids.pop(proxy_ds._id)
)
proxy_ds._id = self.setup_graphics.proxy_scene_id
p = cast(ProxyScene, mb.getScene())
if len(self.collision) > 0:
if p is None:
raise ValueError("Cannot register collision scenes without a proxy scene.")
for c in self.collision:
sc = c.toCollisionScene()
p.registerClientScene(sc, cast(Frame, mb.virtualRoot()))
sp = self.sp_ds.toStatePropagator(mb)
bc.at_exit_fns[f"_discard_sp_{sp.id()}"] = _discard(sp)
sp.setTime(0.0)
s = Sim(mb, sp)
if isinstance(self.setup_graphics, GuiSetupDS):
self.setup_graphics.sp = sp
gui = self.setup_graphics.setupGui(mb, s) # pyright: ignore
s.gui = gui
bc.at_exit_fns[f"_cleanup_gui_{id(gui)}"] = _discard(gui.close)
s._gui_graphics_ds = self.setup_graphics
for x in self.prefabs:
x.toPrefab(s.prefab_root)
return s
@overload
def toFile(
self,
file: Path | str | IO[bytes],
file_type: Optional[
Literal["json", "yaml", "yml", "h5", "hdf5", "pickle", "pck", "pcl"]
] = None,
meta: Meta | None = None,
) -> None:
"""Write the DataStruct to a file.
Parameters
----------
file : Path | str | IO[bytes]
Specify the file to write to. For each type, the file_type of the filename is used to determine
the type of file to write to, e.g., "yaml" for a YAML file or "h5" for an HDF5 file, if the optional
file_type argument is not specified. See below for details. In the case of IO[bytes], if the IO object
has a name attr, that will be used to lookup the file_type.
file_type : Optional[Literal["json","yaml","yml","h5","hdf5","pickle","pck","pcl"]]
This optional keyword argument is used to set the file type. If it is not specified, then then file type
is inferred from the name. If it can't be inferred from the name, then YAML is used.
meta: Meta | None = None
The meta data to include with this file. If None, then a default meta data object will be used.
"""
... # pragma: no cover. Will not be run by code coverage.
@overload
def toFile(self, g: h5py.Group, meta: Meta | None = None) -> None:
"""Write the DataStruct to a H5 group.
Parameters
----------
g : h5py.Group
The group to write to.
meta: Meta | None = None
The meta data to include with this file. If None, then a default meta data object will be used.
"""
... # pragma: no cover. Will not be run by code coverage.
@overload
def toFile(
self,
file: Path | str,
file_type: Optional[Literal["py"]] = None,
meta: Meta | None = None,
) -> None:
"""Write the DataStruct to a Python script file.
Only used if file_type == ".py" or the file name ends in ".py"
Parameters
----------
file : Path | str
Specify the file to write the Python script to.
file_type : Optional[Literal["py"]]
This optional keyword argument is used to set the file type. If it is not specified, then then file type
is inferred from the name.
meta: Meta | None = None
The meta data for this file type is ignored.
"""
... # pragma: no cover. Will not be run by code coverage.
[docs]
def toFile(self, *args, **kwargs): # pyright: ignore
"""Write the SubTreeDS to a file.
See overloads for details.
"""
if isinstance(args[0], h5py.Group):
return super().toFile(*args, **kwargs)
else:
file = args[0]
file_type = kwargs.get("file_type", None)
if file_type is None:
if isinstance(file, Path):
file_type = file.suffix[1:]
elif isinstance(file, str):
file_type = file.split(".")[-1].strip()
elif hasattr(file, "name"):
file_type = file.name.split(".")[-1].strip()
else:
file_type = "yaml"
if file_type == "py":
var_context = VarPythonScriptContext()
imports = {"import atexit", "from Karana.KUtils.Sim import Sim"}
new_imports, mb_code = self.mb_ds._toPythonScript(var_context)
imports |= new_imports
new_imports, sp_code = self.sp_ds._toPythonScript(var_context)
imports |= new_imports
code = f"""# This is an auto-generated file for the Sim
# This file was generated on {datetime.now()}
{"\n".join(list(imports))}
def populateMultibodyAndStatePropagator(mb: kd.Multibody, sp: kd.StatePropagator):
""\" Populate and configure the provided Multibody and StatePropagator.
Parameters
----------
mb : kd.Multibody
The Multibody to populate with physical bodies.
sp : kd.StatePropagator
The StatePropagator to populate and configure.
""\"
# This creates, adds, and connects physical bodies via hinges,
# also referred to as joints, to the provided Multibody instance.
# Often, the input `Multibody' instance is empty and contains just a
# virtual root body.
#
# The code below consists of repeated blocks - one per body. Each block
# defines the mass properties for the body, the parent hinge type and
# location on the body and its parent body, and the mesh geometries attached.
{textwrap.indent(mb_code, " ")}
_KARANA_COMMENT_SEPARATOR
# Populate and configure the StatePropagator. This will involve
# defining the type of solver to use for the multibody equations of
# motion, adding an integrator to use for state propagation,
# addition of KModels for interacting with the multibody dynamics,
# and timed events for execution during time advancement.
{textwrap.indent(sp_code, " ")}
if __name__ == "__main__":
# Create a simulation
sim = Sim()
# Populate the Multibody and StatePropagator for the Sim
populateMultibodyAndStatePropagator(sim.mb, sim.sp)
# Code to cleanup upon exit
def cleanup():
global sim
del sim
atexit.register(cleanup)
"""
writePythonScript(code, Path(file))
else:
return super().toFile(*args, **kwargs)