# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Prefab base class and associated classes/functions."""
from importlib import import_module
from pydantic import SerializeAsAny
from typing_extensions import TypeVar
from typing import Generic, Any, Self, Callable
from copy import deepcopy
from .DataStruct import DataStruct, NestedBaseMixin
from Karana.Core import Base
from .FilesystemLikeDatabase import FilesystemLikeDatabase
[docs]
class Config(DataStruct, NestedBaseMixin):
"""Base class for Prefab Config."""
pass
[docs]
class Context(DataStruct, NestedBaseMixin):
"""Base class for Prefab Context."""
pass
[docs]
class Params(DataStruct, NestedBaseMixin):
"""Base class for Prefab Params."""
pass
ConfigType = TypeVar("ConfigType", bound=Config, default=Config)
ContextType = TypeVar("ContextType", bound=Context, default=Context)
ParamsType = TypeVar("ParamsType", bound=Params, default=Params)
[docs]
class PrefabDS(DataStruct, NestedBaseMixin, Generic[ConfigType, ContextType, ParamsType]):
"""DataStruct for a Prefab.
Classes derived from Prefab may derive from this and override toDS/fromDS as desired. However,
this should work for most Prefabs as is.
Parameters
----------
name: str
Name of the instance.
config: SerializeAsAny[Config]
Config for the Prefab.
context: SerializeAsAny[Context]
Context for the Prefab.
params: SerializeAsAny[Params]
Params for the Prefab.
children: list[SerializeAsAny["PrefabDS"]]
Children Prefabs of this Prefab.
class_name: str
The fully qualified class name. Used to create a new instance
of the class from this DataStruct.
"""
name: str
config: SerializeAsAny[ConfigType]
context: SerializeAsAny[ContextType]
params: SerializeAsAny[ParamsType]
children: list[SerializeAsAny["PrefabDS[Any, Any, Any]"]]
class_name: str
[docs]
def toPrefab(self, parent: "Prefab[Any, Any, Any] | None") -> "Prefab[Any, Any, Any]":
"""Convert this PrefabDS to an instance of the associated Prefab.
This will also create instances of any children Prefabs if applicable.
Parameters
----------
parent : Prefab[Any, Any, Any] | None
The parent to attach this Prefab to.
Returns
-------
Prefab[Any, Any, Any]
An instance of the associated Prefab.
"""
# Get the class associated with the child
dark = self.class_name.split(".")
class_str = dark[-1]
module = ".".join(dark[:-1])
mod = import_module(module)
klass = getattr(mod, class_str)
# Create an instance of it from the DataStruct
obj = klass(
self.name, self.config, self.context, self.params, parent
) # pyright: ignore - Pyright false positive.
# Add children
for c in self.children:
c.toPrefab(obj)
return obj
[docs]
class Prefab(Base, Generic[ConfigType, ContextType, ParamsType]):
"""The Prefab class holds a prefab for a simulation.
Prefabs typically consist of one or more of the following:
* Bodies
* Models
* Other prefabs
Attributes
----------
config : ConfigType
The configuration for the prefab.
context : ContextType
The context for the prefab.
params : ParamsType
The parameters for the Prefab.
"""
def __init__(
self,
name: str,
config: ConfigType,
context: ContextType,
params: ParamsType,
parent_prefab: "Prefab[Any, Any, Any] | None" = None,
):
"""Create an instance of Prefab.
Parameters
----------
name : str
The name for this Prefab.
config : ConfigType
The configuration for the prefab.
context : ContextType
The context for the prefab.
params : ParamsType
The parameters for the Prefab.
"""
# Call the base constructor
super().__init__(name)
# Assign values from incoming arguments
self.config: ConfigType = config
self.context: ContextType = context
self.params: ParamsType = params
# Variables to keep track of the parent and children
self._parent: Prefab[Any, Any, Any] | None = None
self.children: list[Prefab[Any, Any, Any]] = []
# Variable to indicate whether the initialization code has run for this prefab or not
self._initialized: bool = False
# Variable to indicate whether we need to recompute the FilesystemLikeDatabase or not
self._fs_recompute_v = True
# Run any extra logic before setting parent and running compatibility check
self.runBeforeInitialSetParent()
# Set the parent. This will also run the compatibility check.
self.parent = parent_prefab
# Call setup functions if applicable
# This should only run if:
# 1) This is a top-level assembpy (no parent)
# 2) Or, this is not a top-level assembly, but the parent has already been initialized
if self.parent is None or (self.parent is not None and self.parent._initialized):
self._addChildPrefabs()
self._addMultibodyObjects()
self._addKModels()
self._connectKModels()
self._setInitialized(True)
@property
def parent(self) -> "Prefab[Any, Any, Any] | None":
"""Returns the parent prefab of this prefab.
Returns
-------
"Prefab[Any, Any, Any] | None"
The parent prefab of this prefab if it exists.
"""
return self._parent
@parent.setter
def parent(self, parent: "Prefab[Any, Any, Any] | None"):
# Now, check for compatibility with parent if applicable
# Always check the parent is compatible, even if it is None
if not self.checkParentCompatible(parent):
raise ValueError(
f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}"
)
# If we have a parent, then also check we are a compatible child.
if parent is not None:
if not parent.checkChildCompatible(self):
raise ValueError(
f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}"
)
# If we already have a parent, then remove the link between it and this.
# Also, recompute the FS on the old parent, as this child is no longer a part of it.
if self.parent is not None:
self.parent._fs_recompute = True
self.parent.children.remove(self)
# Add parent/child pair if compatiblity checks
self._parent = parent
if parent is not None:
parent.children.append(self)
# Reset _fs_recompute, as the filesystem needs to be recomputed for the hierarchy
# that this prefab has been added to.
self._fs_recompute = True
@property
def _fs_recompute(self) -> bool:
"""Get whether the filesystem needs to be recomputed.
Returns
-------
bool
Whether the filesystem needs to be recomputed.
"""
# We need to get this at the top level of the hierarchy, so traverse up
# and get the value at the top-level (where parent is None)
if self.parent is not None:
return self.parent._fs_recompute
else:
return self._fs_recompute_v
@_fs_recompute.setter
def _fs_recompute(self, value: bool):
"""Set whether the filesystem needs to be recomputed.
Parameters
-------
value: bool
Whether the filesystem needs to be recomputed.
"""
# We need to set this at the top level of the hierarchy, so traverse up
# and set the value at the top-level (where parent is None)
if self.parent is not None:
self.parent._fs_recompute = value
else:
self._fs_recompute_v = value
@property
def _fs(self) -> "FilesystemLikeDatabase[Prefab[Any, Any, Any]]":
"""Get the filesystem-like database for this prefab hierarchy.
Returns
-------
FilesystemLikeDatabase[Prefab[Any, Any, Any]]
The filesystem-like database for this prefab hierarchy.
"""
# If this is not a top-level Prefab, then get the _fs from the top-level
if self.parent is not None:
return self.parent._fs
# If we are here, then this is a top-level prefab. Only recompute if we have to
# i.e., self._fs_recompute is True.
if self._fs_recompute:
# Create the FilesystemLikeDatabase
self._fs_v = FilesystemLikeDatabase[Prefab[Any, Any, Any]]()
# Add ourselves as the root
self._fs_v.addObject(self.name(), self, "/")
# Add all children recursively
def _recurse(
p: Prefab[Any, Any, Any], _fs: FilesystemLikeDatabase[Prefab[Any, Any, Any]]
):
for c in p.children:
_fs.addObject(c.name(), c, p)
_recurse(c, _fs)
_recurse(self, self._fs_v)
self._fs_recompute = False
return self._fs_v
[docs]
def runBeforeInitialSetParent(self):
"""Add any logic/commands that should run before the parent Prefab is set to this method.
This method runs during construction just before the parent Prefab is set.
"""
pass
[docs]
def checkParentCompatible(self, parent: "Prefab[Any, Any, Any] | None") -> bool:
"""Check whether the parent Prefab this Prefab is becoming a child of is compatible.
Parameters
----------
parent : Prefab | None
The parent Prefab this Prefab is becoming a child of. Can be None if there
is no parent.
Returns
-------
bool
True if the parent prefab is compatible, False otherwise.
"""
return True
[docs]
def checkChildCompatible(self, child: "Prefab[Any, Any, Any]") -> bool:
"""Check whether Prefab becoming a child of this Prefab is compatible.
Parameters
----------
child: Prefab
The Prefab becoming a child of this Prefab.
Returns
-------
bool
True if the child prefab is compatible, False otherwise.
"""
return True
[docs]
def addChildPrefabs(self) -> None:
"""Add child prefabs to this prefab.
Override this method and add any child prefabs needed by this prefab.
"""
pass
def _addChildPrefabs(self) -> None:
"""Call addChildPrefabs on this and all children recursively."""
self.addChildPrefabs()
for c in self.children:
c._addChildPrefabs()
[docs]
def addMultibodyObjects(self) -> None:
"""Add any multibody objects used by this prefab.
Override this method and add any multibody objects (e.g., PhysicalBodys,
Nodes, etc.) used by this prefab.
"""
pass
def _addMultibodyObjects(self) -> None:
"""Call addMultibodyObjects on this and all children recursively."""
self.addMultibodyObjects()
for c in self.children:
c._addMultibodyObjects()
[docs]
def addKModels(self) -> None:
"""Add KModels needed by this prefab.
Override this method and add any KModels needed by this prefab.
"""
pass
def _addKModels(self) -> None:
"""Call addKModels on this and all children recursively."""
self.addKModels()
for c in self.children:
c._addKModels()
[docs]
def connectKModels(self) -> None:
"""Add any connections needed by KModels.
Override this method and add logic to connect KModels
in this prefab to other KModels.
"""
pass
def _connectKModels(self) -> None:
"""Call connectKModels on this and all children recursively."""
self.connectKModels()
for c in self.children:
c._connectKModels()
def _setInitialized(self, value: bool) -> None:
"""Set this Prefab and its children's initialized status to value.
Parameters
----------
value : bool
The value to set the _initialized member to.
"""
self._initialized = value
for c in self.children:
c._setInitialized(value)
[docs]
def toDS(self) -> PrefabDS[Config, Context, Params]:
"""Create a PrefabDS from this Prefab.
Returns
-------
PrefabDS
The DataStruct that represents this Prefab.
"""
return PrefabDS[Config, Context, Params](
name=self.name(),
config=self.config.model_copy(deep=True),
context=self.context.model_copy(deep=True),
params=self.params.model_copy(deep=True),
children=[c.toDS() for c in self.children],
class_name=self.__class__.__module__ + "." + self.__class__.__qualname__,
)
[docs]
@classmethod
def fromDS(cls, data_struct: PrefabDS, parent: "Prefab[Any, Any, Any] | None") -> Self:
"""Convert the PrefabDS to an instance of this Prefab.
This will also create instances of the associated children if applicable.
Parameters
----------
data_struct : PrefabDS
The PrefabDS to create an instance of this Prefab from.
parent : Prefab[Any, Any, Any] | None
The parent to attach this Prefab to.
Returns
-------
Self
An instance of this Prefab.
"""
return data_struct.toPrefab(parent) # pyright: ignore - False positive
[docs]
def requestFromHierarchy[
T
](
self,
path: str,
fn: "Callable[[Prefab[Any, Any, Any]], T | None]",
path_root: "Prefab[Any, Any, Any] | None" = None,
) -> list[T]:
"""Make a request to the current hierarchy of prefabs.
This works by finding all the prefabs in the hierarchy that match the given
path as searched from the path_root. If the path_root is None, then the root
of the Prefab hierarchy is used. The provided `fn` is run on all the returned
Prefab's, and any non-None values returned from it are combined into a list and
returned.
Parameters
----------
path : str
The path to use when finding prefabs in the hierarchy. This supports
filesystem-like syntax, so globbing, wildcards, etc. can be used.
fn : Callable[[Prefab[Any, Any, Any]], T | None]
The function to run on each Prefab matching path from the hierarchy.
path_root : "Prefab[Any, Any, Any] | None"
The root to search from. If None is provided, then the root of the
hierarchy is used.
Returns
-------
list[T]
A list of objects returned by user provided function mapped over
the Prefabs matching the path query.
"""
ret: list[T] = []
for prefab in self._fs.getObjects(path, path_root):
val = fn(prefab)
if val is not None:
ret.append(val)
return ret