Source code for Karana.KUtils.Prefab

# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.

"""Prefab base class and associated classes/functions."""

from importlib import import_module
from pydantic import SerializeAsAny
from typing_extensions import TypeVar
from typing import Generic, Any, Self, Callable
from copy import deepcopy
from .DataStruct import DataStruct, NestedBaseMixin
from Karana.Core import Base
from .FilesystemLikeDatabase import FilesystemLikeDatabase


[docs] class Config(DataStruct, NestedBaseMixin): """Base class for Prefab Config.""" pass
[docs] class Context(DataStruct, NestedBaseMixin): """Base class for Prefab Context.""" pass
[docs] class Params(DataStruct, NestedBaseMixin): """Base class for Prefab Params.""" pass
ConfigType = TypeVar("ConfigType", bound=Config, default=Config) ContextType = TypeVar("ContextType", bound=Context, default=Context) ParamsType = TypeVar("ParamsType", bound=Params, default=Params)
[docs] class PrefabDS(DataStruct, NestedBaseMixin, Generic[ConfigType, ContextType, ParamsType]): """DataStruct for a Prefab. Classes derived from Prefab may derive from this and override toDS/fromDS as desired. However, this should work for most Prefabs as is. Parameters ---------- name: str Name of the instance. config: SerializeAsAny[Config] Config for the Prefab. context: SerializeAsAny[Context] Context for the Prefab. params: SerializeAsAny[Params] Params for the Prefab. children: list[SerializeAsAny["PrefabDS"]] Children Prefabs of this Prefab. class_name: str The fully qualified class name. Used to create a new instance of the class from this DataStruct. """ name: str config: SerializeAsAny[ConfigType] context: SerializeAsAny[ContextType] params: SerializeAsAny[ParamsType] children: list[SerializeAsAny["PrefabDS[Any, Any, Any]"]] class_name: str
[docs] def toPrefab(self, parent: "Prefab[Any, Any, Any] | None") -> "Prefab[Any, Any, Any]": """Convert this PrefabDS to an instance of the associated Prefab. This will also create instances of any children Prefabs if applicable. Parameters ---------- parent : Prefab[Any, Any, Any] | None The parent to attach this Prefab to. Returns ------- Prefab[Any, Any, Any] An instance of the associated Prefab. """ # Get the class associated with the child dark = self.class_name.split(".") class_str = dark[-1] module = ".".join(dark[:-1]) mod = import_module(module) klass = getattr(mod, class_str) # Create an instance of it from the DataStruct obj = klass( self.name, self.config, self.context, self.params, parent ) # pyright: ignore - Pyright false positive. # Add children for c in self.children: c.toPrefab(obj) return obj
[docs] class Prefab(Base, Generic[ConfigType, ContextType, ParamsType]): """The Prefab class holds a prefab for a simulation. Prefabs typically consist of one or more of the following: * Bodies * Models * Other prefabs Attributes ---------- config : ConfigType The configuration for the prefab. context : ContextType The context for the prefab. params : ParamsType The parameters for the Prefab. """ def __init__( self, name: str, config: ConfigType, context: ContextType, params: ParamsType, parent_prefab: "Prefab[Any, Any, Any] | None" = None, ): """Create an instance of Prefab. Parameters ---------- name : str The name for this Prefab. config : ConfigType The configuration for the prefab. context : ContextType The context for the prefab. params : ParamsType The parameters for the Prefab. """ # Call the base constructor super().__init__(name) # Assign values from incoming arguments self.config: ConfigType = config self.context: ContextType = context self.params: ParamsType = params # Variables to keep track of the parent and children self._parent: Prefab[Any, Any, Any] | None = None self.children: list[Prefab[Any, Any, Any]] = [] # Variable to indicate whether the initialization code has run for this prefab or not self._initialized: bool = False # Variable to indicate whether we need to recompute the FilesystemLikeDatabase or not self._fs_recompute_v = True # Run any extra logic before setting parent and running compatibility check self.runBeforeInitialSetParent() # Set the parent. This will also run the compatibility check. self.parent = parent_prefab # Call setup functions if applicable # This should only run if: # 1) This is a top-level assembpy (no parent) # 2) Or, this is not a top-level assembly, but the parent has already been initialized if self.parent is None or (self.parent is not None and self.parent._initialized): self._addChildPrefabs() self._addMultibodyObjects() self._addKModels() self._connectKModels() self._setInitialized(True) @property def parent(self) -> "Prefab[Any, Any, Any] | None": """Returns the parent prefab of this prefab. Returns ------- "Prefab[Any, Any, Any] | None" The parent prefab of this prefab if it exists. """ return self._parent @parent.setter def parent(self, parent: "Prefab[Any, Any, Any] | None"): # Now, check for compatibility with parent if applicable # Always check the parent is compatible, even if it is None if not self.checkParentCompatible(parent): raise ValueError( f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}" ) # If we have a parent, then also check we are a compatible child. if parent is not None: if not parent.checkChildCompatible(self): raise ValueError( f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}" ) # If we already have a parent, then remove the link between it and this. # Also, recompute the FS on the old parent, as this child is no longer a part of it. if self.parent is not None: self.parent._fs_recompute = True self.parent.children.remove(self) # Add parent/child pair if compatiblity checks self._parent = parent if parent is not None: parent.children.append(self) # Reset _fs_recompute, as the filesystem needs to be recomputed for the hierarchy # that this prefab has been added to. self._fs_recompute = True @property def _fs_recompute(self) -> bool: """Get whether the filesystem needs to be recomputed. Returns ------- bool Whether the filesystem needs to be recomputed. """ # We need to get this at the top level of the hierarchy, so traverse up # and get the value at the top-level (where parent is None) if self.parent is not None: return self.parent._fs_recompute else: return self._fs_recompute_v @_fs_recompute.setter def _fs_recompute(self, value: bool): """Set whether the filesystem needs to be recomputed. Parameters ------- value: bool Whether the filesystem needs to be recomputed. """ # We need to set this at the top level of the hierarchy, so traverse up # and set the value at the top-level (where parent is None) if self.parent is not None: self.parent._fs_recompute = value else: self._fs_recompute_v = value @property def _fs(self) -> "FilesystemLikeDatabase[Prefab[Any, Any, Any]]": """Get the filesystem-like database for this prefab hierarchy. Returns ------- FilesystemLikeDatabase[Prefab[Any, Any, Any]] The filesystem-like database for this prefab hierarchy. """ # If this is not a top-level Prefab, then get the _fs from the top-level if self.parent is not None: return self.parent._fs # If we are here, then this is a top-level prefab. Only recompute if we have to # i.e., self._fs_recompute is True. if self._fs_recompute: # Create the FilesystemLikeDatabase self._fs_v = FilesystemLikeDatabase[Prefab[Any, Any, Any]]() # Add ourselves as the root self._fs_v.addObject(self.name(), self, "/") # Add all children recursively def _recurse( p: Prefab[Any, Any, Any], _fs: FilesystemLikeDatabase[Prefab[Any, Any, Any]] ): for c in p.children: _fs.addObject(c.name(), c, p) _recurse(c, _fs) _recurse(self, self._fs_v) self._fs_recompute = False return self._fs_v
[docs] def runBeforeInitialSetParent(self): """Add any logic/commands that should run before the parent Prefab is set to this method. This method runs during construction just before the parent Prefab is set. """ pass
[docs] def checkParentCompatible(self, parent: "Prefab[Any, Any, Any] | None") -> bool: """Check whether the parent Prefab this Prefab is becoming a child of is compatible. Parameters ---------- parent : Prefab | None The parent Prefab this Prefab is becoming a child of. Can be None if there is no parent. Returns ------- bool True if the parent prefab is compatible, False otherwise. """ return True
[docs] def checkChildCompatible(self, child: "Prefab[Any, Any, Any]") -> bool: """Check whether Prefab becoming a child of this Prefab is compatible. Parameters ---------- child: Prefab The Prefab becoming a child of this Prefab. Returns ------- bool True if the child prefab is compatible, False otherwise. """ return True
[docs] def addChildPrefabs(self) -> None: """Add child prefabs to this prefab. Override this method and add any child prefabs needed by this prefab. """ pass
def _addChildPrefabs(self) -> None: """Call addChildPrefabs on this and all children recursively.""" self.addChildPrefabs() for c in self.children: c._addChildPrefabs()
[docs] def addMultibodyObjects(self) -> None: """Add any multibody objects used by this prefab. Override this method and add any multibody objects (e.g., PhysicalBodys, Nodes, etc.) used by this prefab. """ pass
def _addMultibodyObjects(self) -> None: """Call addMultibodyObjects on this and all children recursively.""" self.addMultibodyObjects() for c in self.children: c._addMultibodyObjects()
[docs] def addKModels(self) -> None: """Add KModels needed by this prefab. Override this method and add any KModels needed by this prefab. """ pass
def _addKModels(self) -> None: """Call addKModels on this and all children recursively.""" self.addKModels() for c in self.children: c._addKModels()
[docs] def connectKModels(self) -> None: """Add any connections needed by KModels. Override this method and add logic to connect KModels in this prefab to other KModels. """ pass
def _connectKModels(self) -> None: """Call connectKModels on this and all children recursively.""" self.connectKModels() for c in self.children: c._connectKModels() def _setInitialized(self, value: bool) -> None: """Set this Prefab and its children's initialized status to value. Parameters ---------- value : bool The value to set the _initialized member to. """ self._initialized = value for c in self.children: c._setInitialized(value)
[docs] def toDS(self) -> PrefabDS[Config, Context, Params]: """Create a PrefabDS from this Prefab. Returns ------- PrefabDS The DataStruct that represents this Prefab. """ return PrefabDS[Config, Context, Params]( name=self.name(), config=self.config.model_copy(deep=True), context=self.context.model_copy(deep=True), params=self.params.model_copy(deep=True), children=[c.toDS() for c in self.children], class_name=self.__class__.__module__ + "." + self.__class__.__qualname__, )
[docs] @classmethod def fromDS(cls, data_struct: PrefabDS, parent: "Prefab[Any, Any, Any] | None") -> Self: """Convert the PrefabDS to an instance of this Prefab. This will also create instances of the associated children if applicable. Parameters ---------- data_struct : PrefabDS The PrefabDS to create an instance of this Prefab from. parent : Prefab[Any, Any, Any] | None The parent to attach this Prefab to. Returns ------- Self An instance of this Prefab. """ return data_struct.toPrefab(parent) # pyright: ignore - False positive
[docs] def requestFromHierarchy[ T ]( self, path: str, fn: "Callable[[Prefab[Any, Any, Any]], T | None]", path_root: "Prefab[Any, Any, Any] | None" = None, ) -> list[T]: """Make a request to the current hierarchy of prefabs. This works by finding all the prefabs in the hierarchy that match the given path as searched from the path_root. If the path_root is None, then the root of the Prefab hierarchy is used. The provided `fn` is run on all the returned Prefab's, and any non-None values returned from it are combined into a list and returned. Parameters ---------- path : str The path to use when finding prefabs in the hierarchy. This supports filesystem-like syntax, so globbing, wildcards, etc. can be used. fn : Callable[[Prefab[Any, Any, Any]], T | None] The function to run on each Prefab matching path from the hierarchy. path_root : "Prefab[Any, Any, Any] | None" The root to search from. If None is provided, then the root of the hierarchy is used. Returns ------- list[T] A list of objects returned by user provided function mapped over the Prefabs matching the path query. """ ret: list[T] = [] for prefab in self._fs.getObjects(path, path_root): val = fn(prefab) if val is not None: ret.append(val) return ret