Source code for Karana.KUtils.Prefab

# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.

"""Prefab base class and associated classes/functions."""

import numpy as np
from pydantic import (
    Field,
    ModelWrapValidatorHandler,
    SerializeAsAny,
    field_validator,
    model_validator,
)
from typing_extensions import TypeVar
from typing import Generic, Any, Self, Callable
from .DataStruct import DataStruct, NestedBaseMixin, PyClassDS, IdMixin
from Karana.Core import Base, CppWeakRef
from .FilesystemLikeDatabase import FilesystemLikeDatabase
from Karana.Dynamics import PhysicalBody, Node
from Karana.Dynamics.SOADyn_types import BodyDS, NodeDS


[docs] class Config(DataStruct, NestedBaseMixin): """Base class for Prefab Config.""" pass
[docs] class Context(DataStruct, NestedBaseMixin): """Base class for Prefab Context.""" pass
[docs] class Params(DataStruct, NestedBaseMixin): """Base class for Prefab Params.""" pass
ConfigType = TypeVar("ConfigType", bound=Config, default=Config) ContextType = TypeVar("ContextType", bound=Context, default=Context) ParamsType = TypeVar("ParamsType", bound=Params, default=Params)
[docs] class PrefabDS(DataStruct, NestedBaseMixin, IdMixin, Generic[ConfigType, ContextType, ParamsType]): """DataStruct for a Prefab. Classes derived from Prefab may derive from this and override toDS/fromDS as desired. However, this should work for most Prefabs as is. Parameters ---------- name: str Name of the instance. config: SerializeAsAny[Config] Config for the Prefab. context: SerializeAsAny[Context] Context for the Prefab. params: SerializeAsAny[Params] Params for the Prefab. parent: str Name of parent for this prefab relative to the first prefab that was not added by an addChildPrefab method. children: list[SerializeAsAny[DataStruct]] Children Prefabs of this Prefab. class_name: PyClassDS The class that this DataStruct represents. Used to create a new instance of the class from this DataStruct. """ name: str config: SerializeAsAny[ConfigType] context: SerializeAsAny[ContextType] params: SerializeAsAny[ParamsType] parent: str children_raw: list[SerializeAsAny[DataStruct]] = Field(alias="children") class_name: PyClassDS @property def children(self) -> list["PrefabDS[Any, Any, Any]"]: """Get the children Prefabs. Returns ------- list[PrefabDS[Any, Any, Any]] The children Prefabs. """ return self.children_raw # pyright: ignore @children.setter def children(self, children: list["PrefabDS[Any, Any, Any]"]): """Set the children Prefabs. Parameters ---------- children : list[Prefab[Any, Any, Any]] The value to set the children Prefabs to. """ self.children_raw = children # pyright: ignore @field_validator("children_raw", mode="after") def _childrenCheck(cls, v: list[DataStruct]) -> list[DataStruct]: """Ensure all children are Prefabs. Parameters ---------- v : list[DataStruct] The children to check. Returns ------- list[DataStruct] The children after they have been validated. """ for c in v: if not isinstance(c, PrefabDS): raise ValueError(f"Expected all children to be Prefab, but got {type(c)}") return v @model_validator(mode="wrap") @classmethod def _useCorrectType(cls, data: Any, handler: ModelWrapValidatorHandler[Self]) -> Self: """Use the correct type when deserializing. This uses the added field to get the most derived type and deserialize appropriately. This is provided that SerializeAsAny was used when serializing. Parameters ---------- data : Any The data to deserialize from. handler : ModelWrapValidatorHandler[Self] The handler used for deserialization of the base class. Returns ------- Self An instance of the most derived type. """ if isinstance(data, dict): d = data["children"] for k in range(len(d)): if isinstance(d[k], dict) and "ds_python_class" in d[k]: ds_python_class = PyClassDS(**d[k]["ds_python_class"]) klass = ds_python_class.toClass() d[k] = klass(**d[k]) return handler(data) else: return handler(data)
[docs] def toPrefab( self, parent_or_top: "Prefab[Any, Any, Any] | None", context: ContextType | None = None ) -> "Prefab[Any, Any, Any]": """Convert this PrefabDS to an instance of the associated Prefab. This will also create instances of any children Prefabs if applicable. Parameters ---------- parent_or_top: Prefab[Any, Any, Any] | None The parent prefab or top_prefab. First, we will try to find the parent string from the PrefabDS on this value. If it exists, we will use that as the parent. Otherwise, we will use this as the parent directly. If None, then we will assume the Prefab being created is the top-level Prefab. context : ContextType | None = None Optional context to override what was deserialized. If None, then use what was deserialized. Returns ------- Prefab[Any, Any, Any] An instance of the associated Prefab. """ # Get the class associated with the child klass = self.class_name.toClass() if parent_or_top is None: # If top_prefab is None, then parent is None, since this is # the top_prefab parent = None else: # Otherwise, look up the parent try: parent = parent_or_top._fs.getObject(self.parent, parent_or_top) except: parent = parent_or_top # Create an instance of it from the DataStruct if context is not None: obj = klass( self.name, self.config, context, self.params, parent ) # pyright: ignore - Pyright false positive. else: obj = klass( self.name, self.config, self.context, self.params, parent ) # pyright: ignore - Pyright false positive. # Add children for c in self.children: c.toPrefab(obj) return obj
[docs] class Prefab(Base, Generic[ConfigType, ContextType, ParamsType]): """The Prefab class holds a prefab for a simulation. Prefabs typically consist of one or more of the following: * Bodies * Models * Other prefabs Attributes ---------- config : ConfigType The configuration for the prefab. context : ContextType The context for the prefab. params : ParamsType The parameters for the Prefab. """ def __init__( self, name: str, config: ConfigType, context: ContextType, params: ParamsType, parent_prefab: "Prefab[Any, Any, Any] | None" = None, ): """Create an instance of Prefab. Parameters ---------- name : str The name for this Prefab. config : ConfigType The configuration for the prefab. context : ContextType The context for the prefab. params : ParamsType The parameters for the Prefab. """ # Call the base constructor super().__init__(name) # Assign values from incoming arguments self.config: ConfigType = config self.context: ContextType = context self.params: ParamsType = params # Variables to keep track of the parent and children self._parent: Prefab[Any, Any, Any] | None = None self._children: list[Prefab[Any, Any, Any]] = [] # This keeps track of children added by the addChildPrefabs method self._children_added_by_method: list[CppWeakRef[Prefab[Any, Any, Any]]] = [] # Variable to indicate whether the initialization code has run for this prefab or not self._initialized: bool = False # Variable to indicate whether we need to recompute the FilesystemLikeDatabase or not self._fs_recompute_v = True # Run any extra logic before setting parent and running compatibility check self.runBeforeInitialSetParent() # Set the parent. This will also run the compatibility check. self.parent = parent_prefab # Call setup functions if applicable # This should only run if: # 1) This is a top-level assembpy (no parent) # 2) Or, this is not a top-level assembly, but the parent has already been initialized if self.parent is None or (self.parent is not None and self.parent._initialized): self._addChildPrefabs() self._addMultibodyObjects() self._addKModels() self._connectKModels() self._setInitialized(True) @property def parent(self) -> "Prefab[Any, Any, Any] | None": """Returns the parent prefab of this prefab. Returns ------- "Prefab[Any, Any, Any] | None" The parent prefab of this prefab if it exists. """ return self._parent @parent.setter def parent(self, parent: "Prefab[Any, Any, Any] | None"): # Now, check for compatibility with parent if applicable # Always check the parent is compatible, even if it is None if not self.checkParentCompatible(parent): raise ValueError( f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}" ) # If we have a parent, then also check we are a compatible child. if parent is not None: if not parent.checkChildCompatible(self): raise ValueError( f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}" ) # If we already have a parent, then remove the link between it and this. # Also, recompute the FS on the old parent, as this child is no longer a part of it. if self.parent is not None: self.parent._fs_recompute = True self.parent._children.remove(self) # Add parent/child pair if compatiblity checks self._parent = parent if parent is not None: parent._children.append(self) # Reset _fs_recompute, as the filesystem needs to be recomputed for the hierarchy # that this prefab has been added to. self._fs_recompute = True @property def _fs_recompute(self) -> bool: """Get whether the filesystem needs to be recomputed. Returns ------- bool Whether the filesystem needs to be recomputed. """ # We need to get this at the top level of the hierarchy, so traverse up # and get the value at the top-level (where parent is None) if self.parent is not None: return self.parent._fs_recompute else: return self._fs_recompute_v @_fs_recompute.setter def _fs_recompute(self, value: bool): """Set whether the filesystem needs to be recomputed. Parameters ------- value: bool Whether the filesystem needs to be recomputed. """ # We need to set this at the top level of the hierarchy, so traverse up # and set the value at the top-level (where parent is None) if self.parent is not None: self.parent._fs_recompute = value else: self._fs_recompute_v = value @property def _fs(self) -> "FilesystemLikeDatabase[Prefab[Any, Any, Any]]": """Get the filesystem-like database for this prefab hierarchy. Returns ------- FilesystemLikeDatabase[Prefab[Any, Any, Any]] The filesystem-like database for this prefab hierarchy. """ # If this is not a top-level Prefab, then get the _fs from the top-level if self.parent is not None: return self.parent._fs # If we are here, then this is a top-level prefab. Only recompute if we have to # i.e., self._fs_recompute is True. if self._fs_recompute: # Create the FilesystemLikeDatabase self._fs_v = FilesystemLikeDatabase[Prefab[Any, Any, Any]]() # Add ourselves as the root self._fs_v.addObject(self.name(), self, "/") # Add all children recursively def _recurse( p: Prefab[Any, Any, Any], _fs: FilesystemLikeDatabase[Prefab[Any, Any, Any]] ): for c in p._children: _fs.addObject(c.name(), c, p) _recurse(c, _fs) _recurse(self, self._fs_v) self._fs_recompute = False return self._fs_v
[docs] def runBeforeInitialSetParent(self): """Add any logic/commands that should run before the parent Prefab is set to this method. This method runs during construction just before the parent Prefab is set. """ pass
[docs] def checkParentCompatible(self, parent: "Prefab[Any, Any, Any] | None") -> bool: """Check whether the parent Prefab this Prefab is becoming a child of is compatible. Parameters ---------- parent : Prefab | None The parent Prefab this Prefab is becoming a child of. Can be None if there is no parent. Returns ------- bool True if the parent prefab is compatible, False otherwise. """ return True
[docs] def checkChildCompatible(self, child: "Prefab[Any, Any, Any]") -> bool: """Check whether Prefab becoming a child of this Prefab is compatible. Parameters ---------- child: Prefab The Prefab becoming a child of this Prefab. Returns ------- bool True if the child prefab is compatible, False otherwise. """ return True
[docs] def addChildPrefabs(self) -> None: """Add child prefabs to this prefab. Override this method and add any child prefabs needed by this prefab. """ pass
def _addChildPrefabs(self) -> None: """Call addChildPrefabs on this and all children recursively.""" children_before = set(self._children) self.addChildPrefabs() children_after = set(self._children) self._children_added_by_method = [CppWeakRef(x) for x in children_after - children_before] for c in self._children: c._addChildPrefabs()
[docs] def addMultibodyObjects(self) -> None: """Add any multibody objects used by this prefab. Override this method and add any multibody objects (e.g., PhysicalBodys, Nodes, etc.) used by this prefab. """ pass
def _addMultibodyObjects(self) -> None: """Call addMultibodyObjects on this and all children recursively.""" self.addMultibodyObjects() for c in self._children: c._addMultibodyObjects()
[docs] def addKModels(self) -> None: """Add KModels needed by this prefab. Override this method and add any KModels needed by this prefab. """ pass
def _addKModels(self) -> None: """Call addKModels on this and all children recursively.""" self.addKModels() for c in self._children: c._addKModels()
[docs] def connectKModels(self) -> None: """Add any connections needed by KModels. Override this method and add logic to connect KModels in this prefab to other KModels. """ pass
def _connectKModels(self) -> None: """Call connectKModels on this and all children recursively.""" self.connectKModels() for c in self._children: c._connectKModels() def _setInitialized(self, value: bool) -> None: """Set this Prefab and its children's initialized status to value. Parameters ---------- value : bool The value to set the _initialized member to. """ self._initialized = value for c in self._children: c._setInitialized(value)
[docs] def addBody( self, parent_body: PhysicalBody, child_body: PhysicalBody | BodyDS, ) -> PhysicalBody: """Add a body to this Prefab. If this body already exists, just ensure it has the correct parent. Otherwise, create it with the correct parent. Parameters ---------- parent_body : PhysicalBody The parent body to attach the new body to. child_body : PhysicalBody | BodyDS The child_body to add. If BodyDS, then a new body is created; otherwise, we check this has the correct parent and return it. Returns ------- PhysicalBody The body that was either checked or created. """ if isinstance(child_body, PhysicalBody): if child_body.physicalParentBody() is parent_body: return child_body else: raise ValueError( f"The child body provided, {child_body.name()}, is not the child of the provided parent body, {parent_body.name()}. The parent is {child_body.physicalParentBody().name()}." ) else: return child_body.toBody(parent_body)
[docs] def addNode( self, parent_body: PhysicalBody, node: Node | NodeDS, ) -> Node: """Add a node to this Prefab. If this node already exists, just ensure it has the correct parent. Otherwise, create it with the correct parent. Parameters ---------- parent_body : PhysicalBody The parent node to attach the new node to. child_node : Node | NodeDS The child_node to add. If NodeDS, then a new node is created; otherwise, we check this has the correct parent and return it. Returns ------- Node The node that was either checked or created. """ if isinstance(node, Node): if node.parentBody() is parent_body: return node else: raise ValueError( f"The node provided, {node.name()}, is not the child of the provided parent body, {parent_body.name()}. The parent is {node.parentBody().name()}." ) else: return node.toNode(parent_body)
[docs] def toDS(self) -> PrefabDS[ConfigType, ContextType, ParamsType]: """Create a PrefabDS from this Prefab. Returns ------- PrefabDS The DataStruct that represents this Prefab. """ parents = [] children_added_by_method = [] p = self.parent while p is not None: children_added_by_method += [ x() for x in p._children_added_by_method if x() is not None ] parents.append(p) p = p.parent effective_parent = None for p in parents: if p not in children_added_by_method: effective_parent = p break def _getNameFromParent(name: str, p: Prefab[Any, Any, Any] | None) -> str: if p is None or p is effective_parent: # * If p is None, then we hit the top of the hierarchy and are done # * If p is the effective parent, then we have the location of this # prefab relative to the effective parent we want to be added to. return name else: if name == "": name = p.name() else: name = f"{p.name()}/{name}" return _getNameFromParent(name, p.parent) parent_name = _getNameFromParent("", self.parent) def _addChildren(p: Prefab[Any, Any, Any]) -> list[Prefab[Any, Any, Any]]: children = [] self_added_prefabs = [x() for x in p._children_added_by_method] for c in p._children: if c not in self_added_prefabs: children.append(c) else: children += _addChildren(c) return children return PrefabDS[type(self.config), type(self.context), type(self.params)]( name=self.name(), config=self.config.model_copy(deep=True), context=self.context.model_copy(deep=True), params=self.params.model_copy(deep=True), parent=parent_name, children=[c.toDS() for c in _addChildren(self)], class_name=PyClassDS.fromObj(self), )
[docs] @classmethod def fromDS( cls, data_struct: PrefabDS[ConfigType, ContextType, ParamsType], parent_or_top: "Prefab[Any, Any, Any] | None", *_, **kwargs, ) -> Self: """Convert the PrefabDS to an instance of this Prefab. This will also create instances of the associated children if applicable. Parameters ---------- data_struct : PrefabDS The PrefabDS to create an instance of this Prefab from. parent_or_top: Prefab[Any, Any, Any] | None The parent prefab or top_prefab. First, we will try to find the parent string from the PrefabDS on this value. If it exists, we will use that as the parent. Otherwise, we will use this as the parent directly. If None, then we will assume the Prefab being created is the top-level Prefab. Returns ------- Self An instance of this Prefab. """ return data_struct.toPrefab(parent_or_top) # pyright: ignore - False positive
[docs] def requestFromHierarchy[ T ]( self, path: str, fn: "Callable[[Prefab[Any, Any, Any]], T | None]", path_root: "Prefab[Any, Any, Any] | None" = None, ) -> list[T]: """Make a request to the current hierarchy of prefabs. This works by finding all the prefabs in the hierarchy that match the given path as searched from the path_root. If the path_root is None, then the root of the Prefab hierarchy is used. The provided `fn` is run on all the returned Prefab's, and any non-None values returned from it are combined into a list and returned. Parameters ---------- path : str The path to use when finding prefabs in the hierarchy. This supports filesystem-like syntax, so globbing, wildcards, etc. can be used. fn : Callable[[Prefab[Any, Any, Any]], T | None] The function to run on each Prefab matching path from the hierarchy. path_root : "Prefab[Any, Any, Any] | None" The root to search from. If None is provided, then the root of the hierarchy is used. Returns ------- list[T] A list of objects returned by user provided function mapped over the Prefabs matching the path query. """ ret: list[T] = [] for prefab in self._fs.getObjects(path, path_root): val = fn(prefab) if val is not None: ret.append(val) return ret
[docs] def requestUniqueFromHierarchy[ T ]( self, path: str, fn: "Callable[[Prefab[Any, Any, Any]], T | None]", path_root: "Prefab[Any, Any, Any] | None" = None, ) -> T: """Make a request to the current hierarchy of prefabs. This works by finding all the prefabs in the hierarchy that match the given path as searched from the path_root. If the path_root is None, then the root of the Prefab hierarchy is used. The provided `fn` is run on all the returned Prefabs, if one, unique non-None value is returned from applying `fn` to these Prefabs, then it is returned. If there is not one, unique non-None value, then an error message is thrown. Parameters ---------- path : str The path to use when finding prefabs in the hierarchy. This supports filesystem-like syntax, so globbing, wildcards, etc. can be used. fn : Callable[[Prefab[Any, Any, Any]], T | None] The function to run on each Prefab matching path from the hierarchy. path_root : "Prefab[Any, Any, Any] | None" The root to search from. If None is provided, then the root of the hierarchy is used. Returns ------- T A unique object returned by the user provided function mapped over the Prefabs matching the path query. """ # Find all values T from applying the function to the prefabs candidates: list[T] = self.requestFromHierarchy(path, fn, path_root=path_root) # Ensure that the return value is unique, and return it if it is. # Otherwise, raise an error. len_candidates = len(candidates) if len_candidates == 1: return candidates[0] elif len_candidates == 0: raise ValueError("Did not find any values that match the request.") else: raise ValueError(f"Found {len_candidates} values rather than one unique value.")
[docs] def requestClosestFromHierarchy[ T ]( self, path: str, fn: "Callable[[Prefab[Any, Any, Any]], T | None]", path_root: "Prefab[Any, Any, Any] | None" = None, ) -> T: """Make a request to the current hierarchy of prefabs. This works by finding all the prefabs in the hierarchy that match the given path as searched from the path_root. If the path_root is None, then the root of the Prefab hierarchy is used. The provided `fn` is run on all the returned Prefabs. Then, the one that is closest is returned. If there is a tie for the closest or None are returned, then an error is throw. Parameters ---------- path : str The path to use when finding prefabs in the hierarchy. This supports filesystem-like syntax, so globbing, wildcards, etc. can be used. fn : Callable[[Prefab[Any, Any, Any]], T | None] The function to run on each Prefab matching path from the hierarchy. path_root : "Prefab[Any, Any, Any] | None" The root to search from. If None is provided, then the root of the hierarchy is used. Returns ------- T The closest object returned by the user provided function mapped over the Prefabs matching the path query. """ # Find all values T from applying the function to the prefabs candidates: list[tuple[T, int]] = [] distances: list[int] = [] for prefab, dist in self._fs.getObjectsAndDists(path, path_root): val = fn(prefab) if val is not None: candidates.append((val, dist)) distances.append(dist) if len(distances) == 0: raise ValueError( "Could not find any values that match the user-supplied function mapped over the given path for Prefabs" ) distances_np = np.array(distances, np.int32) argmin = np.argmin(distances_np) count = np.count_nonzero(distances_np == distances_np[argmin]) if count == 1: return candidates[argmin][0] else: raise ValueError( f"There was a {count}-way tie for closest value for the given user-supplied ufuntion mapped over the given path for Prefabs" )