# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Prefab base class and associated classes/functions."""
import numpy as np
from pydantic import (
Field,
ModelWrapValidatorHandler,
SerializeAsAny,
field_validator,
model_validator,
)
from typing_extensions import TypeVar
from typing import Generic, Any, Self, Callable
from .DataStruct import DataStruct, NestedBaseMixin, PyClassDS, IdMixin
from Karana.Core import Base, CppWeakRef
from .FilesystemLikeDatabase import FilesystemLikeDatabase
from Karana.Dynamics import PhysicalBody, Node
from Karana.Dynamics.SOADyn_types import BodyDS, NodeDS
[docs]
class Config(DataStruct, NestedBaseMixin):
"""Base class for Prefab Config."""
pass
[docs]
class Context(DataStruct, NestedBaseMixin):
"""Base class for Prefab Context."""
pass
[docs]
class Params(DataStruct, NestedBaseMixin):
"""Base class for Prefab Params."""
pass
ConfigType = TypeVar("ConfigType", bound=Config, default=Config)
ContextType = TypeVar("ContextType", bound=Context, default=Context)
ParamsType = TypeVar("ParamsType", bound=Params, default=Params)
[docs]
class PrefabDS(DataStruct, NestedBaseMixin, IdMixin, Generic[ConfigType, ContextType, ParamsType]):
"""DataStruct for a Prefab.
Classes derived from Prefab may derive from this and override toDS/fromDS as desired. However,
this should work for most Prefabs as is.
Parameters
----------
name: str
Name of the instance.
config: SerializeAsAny[Config]
Config for the Prefab.
context: SerializeAsAny[Context]
Context for the Prefab.
params: SerializeAsAny[Params]
Params for the Prefab.
parent: str
Name of parent for this prefab relative to the first prefab that was not
added by an addChildPrefab method.
children: list[SerializeAsAny[DataStruct]]
Children Prefabs of this Prefab.
class_name: PyClassDS
The class that this DataStruct represents. Used to create a new instance
of the class from this DataStruct.
"""
name: str
config: SerializeAsAny[ConfigType]
context: SerializeAsAny[ContextType]
params: SerializeAsAny[ParamsType]
parent: str
children_raw: list[SerializeAsAny[DataStruct]] = Field(alias="children")
class_name: PyClassDS
@property
def children(self) -> list["PrefabDS[Any, Any, Any]"]:
"""Get the children Prefabs.
Returns
-------
list[PrefabDS[Any, Any, Any]]
The children Prefabs.
"""
return self.children_raw # pyright: ignore
@children.setter
def children(self, children: list["PrefabDS[Any, Any, Any]"]):
"""Set the children Prefabs.
Parameters
----------
children : list[Prefab[Any, Any, Any]]
The value to set the children Prefabs to.
"""
self.children_raw = children # pyright: ignore
@field_validator("children_raw", mode="after")
def _childrenCheck(cls, v: list[DataStruct]) -> list[DataStruct]:
"""Ensure all children are Prefabs.
Parameters
----------
v : list[DataStruct]
The children to check.
Returns
-------
list[DataStruct]
The children after they have been validated.
"""
for c in v:
if not isinstance(c, PrefabDS):
raise ValueError(f"Expected all children to be Prefab, but got {type(c)}")
return v
@model_validator(mode="wrap")
@classmethod
def _useCorrectType(cls, data: Any, handler: ModelWrapValidatorHandler[Self]) -> Self:
"""Use the correct type when deserializing.
This uses the added field to get the most derived type and deserialize appropriately.
This is provided that SerializeAsAny was used when serializing.
Parameters
----------
data : Any
The data to deserialize from.
handler : ModelWrapValidatorHandler[Self]
The handler used for deserialization of the base class.
Returns
-------
Self
An instance of the most derived type.
"""
if isinstance(data, dict):
d = data["children"]
for k in range(len(d)):
if isinstance(d[k], dict) and "ds_python_class" in d[k]:
ds_python_class = PyClassDS(**d[k]["ds_python_class"])
klass = ds_python_class.toClass()
d[k] = klass(**d[k])
return handler(data)
else:
return handler(data)
[docs]
def toPrefab(
self, parent_or_top: "Prefab[Any, Any, Any] | None", context: ContextType | None = None
) -> "Prefab[Any, Any, Any]":
"""Convert this PrefabDS to an instance of the associated Prefab.
This will also create instances of any children Prefabs if applicable.
Parameters
----------
parent_or_top: Prefab[Any, Any, Any] | None
The parent prefab or top_prefab. First, we will try to find the parent string
from the PrefabDS on this value. If it exists, we will use that as the parent.
Otherwise, we will use this as the parent directly. If None, then we will assume
the Prefab being created is the top-level Prefab.
context : ContextType | None = None
Optional context to override what was deserialized. If None, then use what was
deserialized.
Returns
-------
Prefab[Any, Any, Any]
An instance of the associated Prefab.
"""
# Get the class associated with the child
klass = self.class_name.toClass()
if parent_or_top is None:
# If top_prefab is None, then parent is None, since this is
# the top_prefab
parent = None
else:
# Otherwise, look up the parent
try:
parent = parent_or_top._fs.getObject(self.parent, parent_or_top)
except:
parent = parent_or_top
# Create an instance of it from the DataStruct
if context is not None:
obj = klass(
self.name, self.config, context, self.params, parent
) # pyright: ignore - Pyright false positive.
else:
obj = klass(
self.name, self.config, self.context, self.params, parent
) # pyright: ignore - Pyright false positive.
# Add children
for c in self.children:
c.toPrefab(obj)
return obj
[docs]
class Prefab(Base, Generic[ConfigType, ContextType, ParamsType]):
"""The Prefab class holds a prefab for a simulation.
Prefabs typically consist of one or more of the following:
* Bodies
* Models
* Other prefabs
Attributes
----------
config : ConfigType
The configuration for the prefab.
context : ContextType
The context for the prefab.
params : ParamsType
The parameters for the Prefab.
"""
def __init__(
self,
name: str,
config: ConfigType,
context: ContextType,
params: ParamsType,
parent_prefab: "Prefab[Any, Any, Any] | None" = None,
):
"""Create an instance of Prefab.
Parameters
----------
name : str
The name for this Prefab.
config : ConfigType
The configuration for the prefab.
context : ContextType
The context for the prefab.
params : ParamsType
The parameters for the Prefab.
"""
# Call the base constructor
super().__init__(name)
# Assign values from incoming arguments
self.config: ConfigType = config
self.context: ContextType = context
self.params: ParamsType = params
# Variables to keep track of the parent and children
self._parent: Prefab[Any, Any, Any] | None = None
self._children: list[Prefab[Any, Any, Any]] = []
# This keeps track of children added by the addChildPrefabs method
self._children_added_by_method: list[CppWeakRef[Prefab[Any, Any, Any]]] = []
# Variable to indicate whether the initialization code has run for this prefab or not
self._initialized: bool = False
# Variable to indicate whether we need to recompute the FilesystemLikeDatabase or not
self._fs_recompute_v = True
# Run any extra logic before setting parent and running compatibility check
self.runBeforeInitialSetParent()
# Set the parent. This will also run the compatibility check.
self.parent = parent_prefab
# Call setup functions if applicable
# This should only run if:
# 1) This is a top-level assembpy (no parent)
# 2) Or, this is not a top-level assembly, but the parent has already been initialized
if self.parent is None or (self.parent is not None and self.parent._initialized):
self._addChildPrefabs()
self._addMultibodyObjects()
self._addKModels()
self._connectKModels()
self._setInitialized(True)
@property
def parent(self) -> "Prefab[Any, Any, Any] | None":
"""Returns the parent prefab of this prefab.
Returns
-------
"Prefab[Any, Any, Any] | None"
The parent prefab of this prefab if it exists.
"""
return self._parent
@parent.setter
def parent(self, parent: "Prefab[Any, Any, Any] | None"):
# Now, check for compatibility with parent if applicable
# Always check the parent is compatible, even if it is None
if not self.checkParentCompatible(parent):
raise ValueError(
f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}"
)
# If we have a parent, then also check we are a compatible child.
if parent is not None:
if not parent.checkChildCompatible(self):
raise ValueError(
f"Error, the prefab {self.name} is not compatible with the parent_prefab {parent.name if parent is not None else None}"
)
# If we already have a parent, then remove the link between it and this.
# Also, recompute the FS on the old parent, as this child is no longer a part of it.
if self.parent is not None:
self.parent._fs_recompute = True
self.parent._children.remove(self)
# Add parent/child pair if compatiblity checks
self._parent = parent
if parent is not None:
parent._children.append(self)
# Reset _fs_recompute, as the filesystem needs to be recomputed for the hierarchy
# that this prefab has been added to.
self._fs_recompute = True
@property
def _fs_recompute(self) -> bool:
"""Get whether the filesystem needs to be recomputed.
Returns
-------
bool
Whether the filesystem needs to be recomputed.
"""
# We need to get this at the top level of the hierarchy, so traverse up
# and get the value at the top-level (where parent is None)
if self.parent is not None:
return self.parent._fs_recompute
else:
return self._fs_recompute_v
@_fs_recompute.setter
def _fs_recompute(self, value: bool):
"""Set whether the filesystem needs to be recomputed.
Parameters
-------
value: bool
Whether the filesystem needs to be recomputed.
"""
# We need to set this at the top level of the hierarchy, so traverse up
# and set the value at the top-level (where parent is None)
if self.parent is not None:
self.parent._fs_recompute = value
else:
self._fs_recompute_v = value
@property
def _fs(self) -> "FilesystemLikeDatabase[Prefab[Any, Any, Any]]":
"""Get the filesystem-like database for this prefab hierarchy.
Returns
-------
FilesystemLikeDatabase[Prefab[Any, Any, Any]]
The filesystem-like database for this prefab hierarchy.
"""
# If this is not a top-level Prefab, then get the _fs from the top-level
if self.parent is not None:
return self.parent._fs
# If we are here, then this is a top-level prefab. Only recompute if we have to
# i.e., self._fs_recompute is True.
if self._fs_recompute:
# Create the FilesystemLikeDatabase
self._fs_v = FilesystemLikeDatabase[Prefab[Any, Any, Any]]()
# Add ourselves as the root
self._fs_v.addObject(self.name(), self, "/")
# Add all children recursively
def _recurse(
p: Prefab[Any, Any, Any], _fs: FilesystemLikeDatabase[Prefab[Any, Any, Any]]
):
for c in p._children:
_fs.addObject(c.name(), c, p)
_recurse(c, _fs)
_recurse(self, self._fs_v)
self._fs_recompute = False
return self._fs_v
[docs]
def runBeforeInitialSetParent(self):
"""Add any logic/commands that should run before the parent Prefab is set to this method.
This method runs during construction just before the parent Prefab is set.
"""
pass
[docs]
def checkParentCompatible(self, parent: "Prefab[Any, Any, Any] | None") -> bool:
"""Check whether the parent Prefab this Prefab is becoming a child of is compatible.
Parameters
----------
parent : Prefab | None
The parent Prefab this Prefab is becoming a child of. Can be None if there
is no parent.
Returns
-------
bool
True if the parent prefab is compatible, False otherwise.
"""
return True
[docs]
def checkChildCompatible(self, child: "Prefab[Any, Any, Any]") -> bool:
"""Check whether Prefab becoming a child of this Prefab is compatible.
Parameters
----------
child: Prefab
The Prefab becoming a child of this Prefab.
Returns
-------
bool
True if the child prefab is compatible, False otherwise.
"""
return True
[docs]
def addChildPrefabs(self) -> None:
"""Add child prefabs to this prefab.
Override this method and add any child prefabs needed by this prefab.
"""
pass
def _addChildPrefabs(self) -> None:
"""Call addChildPrefabs on this and all children recursively."""
children_before = set(self._children)
self.addChildPrefabs()
children_after = set(self._children)
self._children_added_by_method = [CppWeakRef(x) for x in children_after - children_before]
for c in self._children:
c._addChildPrefabs()
[docs]
def addMultibodyObjects(self) -> None:
"""Add any multibody objects used by this prefab.
Override this method and add any multibody objects (e.g., PhysicalBodys,
Nodes, etc.) used by this prefab.
"""
pass
def _addMultibodyObjects(self) -> None:
"""Call addMultibodyObjects on this and all children recursively."""
self.addMultibodyObjects()
for c in self._children:
c._addMultibodyObjects()
[docs]
def addKModels(self) -> None:
"""Add KModels needed by this prefab.
Override this method and add any KModels needed by this prefab.
"""
pass
def _addKModels(self) -> None:
"""Call addKModels on this and all children recursively."""
self.addKModels()
for c in self._children:
c._addKModels()
[docs]
def connectKModels(self) -> None:
"""Add any connections needed by KModels.
Override this method and add logic to connect KModels
in this prefab to other KModels.
"""
pass
def _connectKModels(self) -> None:
"""Call connectKModels on this and all children recursively."""
self.connectKModels()
for c in self._children:
c._connectKModels()
def _setInitialized(self, value: bool) -> None:
"""Set this Prefab and its children's initialized status to value.
Parameters
----------
value : bool
The value to set the _initialized member to.
"""
self._initialized = value
for c in self._children:
c._setInitialized(value)
[docs]
def addBody(
self,
parent_body: PhysicalBody,
child_body: PhysicalBody | BodyDS,
) -> PhysicalBody:
"""Add a body to this Prefab.
If this body already exists, just ensure it has the correct parent. Otherwise,
create it with the correct parent.
Parameters
----------
parent_body : PhysicalBody
The parent body to attach the new body to.
child_body : PhysicalBody | BodyDS
The child_body to add. If BodyDS, then a new body is created;
otherwise, we check this has the correct parent and return it.
Returns
-------
PhysicalBody
The body that was either checked or created.
"""
if isinstance(child_body, PhysicalBody):
if child_body.physicalParentBody() is parent_body:
return child_body
else:
raise ValueError(
f"The child body provided, {child_body.name()}, is not the child of the provided parent body, {parent_body.name()}. The parent is {child_body.physicalParentBody().name()}."
)
else:
return child_body.toBody(parent_body)
[docs]
def addNode(
self,
parent_body: PhysicalBody,
node: Node | NodeDS,
) -> Node:
"""Add a node to this Prefab.
If this node already exists, just ensure it has the correct parent. Otherwise,
create it with the correct parent.
Parameters
----------
parent_body : PhysicalBody
The parent node to attach the new node to.
child_node : Node | NodeDS
The child_node to add. If NodeDS, then a new node is created;
otherwise, we check this has the correct parent and return it.
Returns
-------
Node
The node that was either checked or created.
"""
if isinstance(node, Node):
if node.parentBody() is parent_body:
return node
else:
raise ValueError(
f"The node provided, {node.name()}, is not the child of the provided parent body, {parent_body.name()}. The parent is {node.parentBody().name()}."
)
else:
return node.toNode(parent_body)
[docs]
def toDS(self) -> PrefabDS[ConfigType, ContextType, ParamsType]:
"""Create a PrefabDS from this Prefab.
Returns
-------
PrefabDS
The DataStruct that represents this Prefab.
"""
parents = []
children_added_by_method = []
p = self.parent
while p is not None:
children_added_by_method += [
x() for x in p._children_added_by_method if x() is not None
]
parents.append(p)
p = p.parent
effective_parent = None
for p in parents:
if p not in children_added_by_method:
effective_parent = p
break
def _getNameFromParent(name: str, p: Prefab[Any, Any, Any] | None) -> str:
if p is None or p is effective_parent:
# * If p is None, then we hit the top of the hierarchy and are done
# * If p is the effective parent, then we have the location of this
# prefab relative to the effective parent we want to be added to.
return name
else:
if name == "":
name = p.name()
else:
name = f"{p.name()}/{name}"
return _getNameFromParent(name, p.parent)
parent_name = _getNameFromParent("", self.parent)
def _addChildren(p: Prefab[Any, Any, Any]) -> list[Prefab[Any, Any, Any]]:
children = []
self_added_prefabs = [x() for x in p._children_added_by_method]
for c in p._children:
if c not in self_added_prefabs:
children.append(c)
else:
children += _addChildren(c)
return children
return PrefabDS[type(self.config), type(self.context), type(self.params)](
name=self.name(),
config=self.config.model_copy(deep=True),
context=self.context.model_copy(deep=True),
params=self.params.model_copy(deep=True),
parent=parent_name,
children=[c.toDS() for c in _addChildren(self)],
class_name=PyClassDS.fromObj(self),
)
[docs]
@classmethod
def fromDS(
cls,
data_struct: PrefabDS[ConfigType, ContextType, ParamsType],
parent_or_top: "Prefab[Any, Any, Any] | None",
*_,
**kwargs,
) -> Self:
"""Convert the PrefabDS to an instance of this Prefab.
This will also create instances of the associated children if applicable.
Parameters
----------
data_struct : PrefabDS
The PrefabDS to create an instance of this Prefab from.
parent_or_top: Prefab[Any, Any, Any] | None
The parent prefab or top_prefab. First, we will try to find the parent string
from the PrefabDS on this value. If it exists, we will use that as the parent.
Otherwise, we will use this as the parent directly. If None, then we will assume
the Prefab being created is the top-level Prefab.
Returns
-------
Self
An instance of this Prefab.
"""
return data_struct.toPrefab(parent_or_top) # pyright: ignore - False positive
[docs]
def requestFromHierarchy[
T
](
self,
path: str,
fn: "Callable[[Prefab[Any, Any, Any]], T | None]",
path_root: "Prefab[Any, Any, Any] | None" = None,
) -> list[T]:
"""Make a request to the current hierarchy of prefabs.
This works by finding all the prefabs in the hierarchy that match the given
path as searched from the path_root. If the path_root is None, then the root
of the Prefab hierarchy is used. The provided `fn` is run on all the returned
Prefab's, and any non-None values returned from it are combined into a list and
returned.
Parameters
----------
path : str
The path to use when finding prefabs in the hierarchy. This supports
filesystem-like syntax, so globbing, wildcards, etc. can be used.
fn : Callable[[Prefab[Any, Any, Any]], T | None]
The function to run on each Prefab matching path from the hierarchy.
path_root : "Prefab[Any, Any, Any] | None"
The root to search from. If None is provided, then the root of the
hierarchy is used.
Returns
-------
list[T]
A list of objects returned by user provided function mapped over
the Prefabs matching the path query.
"""
ret: list[T] = []
for prefab in self._fs.getObjects(path, path_root):
val = fn(prefab)
if val is not None:
ret.append(val)
return ret
[docs]
def requestUniqueFromHierarchy[
T
](
self,
path: str,
fn: "Callable[[Prefab[Any, Any, Any]], T | None]",
path_root: "Prefab[Any, Any, Any] | None" = None,
) -> T:
"""Make a request to the current hierarchy of prefabs.
This works by finding all the prefabs in the hierarchy that match the given
path as searched from the path_root. If the path_root is None, then the root
of the Prefab hierarchy is used. The provided `fn` is run on all the returned
Prefabs, if one, unique non-None value is returned from applying `fn` to these
Prefabs, then it is returned. If there is not one, unique non-None value, then
an error message is thrown.
Parameters
----------
path : str
The path to use when finding prefabs in the hierarchy. This supports
filesystem-like syntax, so globbing, wildcards, etc. can be used.
fn : Callable[[Prefab[Any, Any, Any]], T | None]
The function to run on each Prefab matching path from the hierarchy.
path_root : "Prefab[Any, Any, Any] | None"
The root to search from. If None is provided, then the root of the
hierarchy is used.
Returns
-------
T
A unique object returned by the user provided function mapped over
the Prefabs matching the path query.
"""
# Find all values T from applying the function to the prefabs
candidates: list[T] = self.requestFromHierarchy(path, fn, path_root=path_root)
# Ensure that the return value is unique, and return it if it is.
# Otherwise, raise an error.
len_candidates = len(candidates)
if len_candidates == 1:
return candidates[0]
elif len_candidates == 0:
raise ValueError("Did not find any values that match the request.")
else:
raise ValueError(f"Found {len_candidates} values rather than one unique value.")
[docs]
def requestClosestFromHierarchy[
T
](
self,
path: str,
fn: "Callable[[Prefab[Any, Any, Any]], T | None]",
path_root: "Prefab[Any, Any, Any] | None" = None,
) -> T:
"""Make a request to the current hierarchy of prefabs.
This works by finding all the prefabs in the hierarchy that match the given
path as searched from the path_root. If the path_root is None, then the root
of the Prefab hierarchy is used. The provided `fn` is run on all the returned
Prefabs. Then, the one that is closest is returned. If there is a tie for the
closest or None are returned, then an error is throw.
Parameters
----------
path : str
The path to use when finding prefabs in the hierarchy. This supports
filesystem-like syntax, so globbing, wildcards, etc. can be used.
fn : Callable[[Prefab[Any, Any, Any]], T | None]
The function to run on each Prefab matching path from the hierarchy.
path_root : "Prefab[Any, Any, Any] | None"
The root to search from. If None is provided, then the root of the
hierarchy is used.
Returns
-------
T
The closest object returned by the user provided function mapped over
the Prefabs matching the path query.
"""
# Find all values T from applying the function to the prefabs
candidates: list[tuple[T, int]] = []
distances: list[int] = []
for prefab, dist in self._fs.getObjectsAndDists(path, path_root):
val = fn(prefab)
if val is not None:
candidates.append((val, dist))
distances.append(dist)
if len(distances) == 0:
raise ValueError(
"Could not find any values that match the user-supplied function mapped over the given path for Prefabs"
)
distances_np = np.array(distances, np.int32)
argmin = np.argmin(distances_np)
count = np.count_nonzero(distances_np == distances_np[argmin])
if count == 1:
return candidates[argmin][0]
else:
raise ValueError(
f"There was a {count}-way tie for closest value for the given user-supplied ufuntion mapped over the given path for Prefabs"
)