# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Contains the MultibodyWebUI class."""
import textwrap
import atexit
from typing import Callable, Any
import Karana.Dynamics as kd
import Karana.Core as kc
import Karana.Scene as ks
import Karana.WebUI as kw
import Karana.Frame as kf
import Karana.KUtils.visjs as vjs
from ._infopanel import InfoPanel
__all__ = ["MultibodyWebUI"]
class CleanupCallbacks:
def __init__(self, verbose=False):
self._callbacks = []
self._verbose = verbose
atexit.register(self.execute)
def __call__(self, cb_or_name: str | Callable):
if isinstance(cb_or_name, str):
def _inner(cb: Callable):
self._callbacks.append((cb_or_name, cb))
return cb
return _inner
self._callbacks.append(("unnamed", cb_or_name))
return cb_or_name
def execute(self):
while self._callbacks:
name, cb = self._callbacks.pop()
if self._verbose:
print(f"Running cleanup {name}")
cb()
if self._verbose:
print(f"Finished cleanup {name}")
[docs]
class MultibodyWebUI:
"""MultibodyWebUI class.
This class creates a Multibody-centric web-based GUI
"""
def __init__(self, mbody: kd.Multibody, *, port: int = 29534, stick_parts: bool = False):
"""Create a MultibodyWebUI instance.
Parameters
----------
mbody: Multibody
The Multibody to create a GUI for
port: int
The port to run the GUI server on. Defaults to 29534.
stick_parts: bool
If True, create stick parts for the multibody. Defaults to
False.
"""
self.mbody: kd.Multibody = mbody
self.cleanups = CleanupCallbacks(verbose=False)
# main server setup
self.server = self._setupServer(port=port)
self.router = kw.Router(self.server)
# Shared state for the selected item
self.selection: ks.State = kw.State(self.router)
# 3d graphics setup
self.scene: ks.ProxyScene = self._ensureScene()
self.graphics: ks.WebScene = self._ensureGraphics()
self.graphics_frame = kw.IFrame(self.router, self.graphics.server().guessUrl())
self.outline_parts = []
# visjs graph viz setup
self.visjs_server = self._setupVisjs()
self.visjs_frame = kw.IFrame(self.router, self.visjs_server.getUrl())
# selection info box setup
self.info_box = self._setupInfoBox()
# multibody tree view setup
self.tree_view = self._setupTreeView()
# dock layout setup
self.dock = kw.Dock(self.router)
self.dock.addChild(title="Graph View", widget=self.visjs_frame)
self.dock.addChild(
title="3D View",
widget=self.graphics_frame,
relative_to=self.visjs_frame,
direction="within",
)
self.dock.addChild(
title="Tree View",
widget=self.tree_view,
relative_to=self.graphics_frame,
direction="right",
)
self.dock.addChild(
title="Selection Info",
widget=self.info_box.wroot,
relative_to=self.tree_view,
direction="below",
)
self.dock.addToDomRoot()
self.cleanups("clobberFields")(self._clobberFields)
if stick_parts:
self.mbody.createStickParts()
def _clobberFields(self):
self.outline_parts = None
self.dock = None
self.tree_view = None
self.info_box = None
self.visjs_frame = None
self.visjs_server = None
self.graphics_frame = None
self.graphics = None
self.scene = None
self.router = None
self.server = None
self.mbody = None
def _setupServer(self, port: int) -> kw.HttpWsServer:
# main server setup
server = kw.HttpWsServer(port=port)
frontend = kc.findShareDir() / "WebUI" / "frontend"
server.serveFile("/", frontend / "index.html")
server.serveFile("/main.js", frontend / "router-main-bundle.js")
server.serveFile("/style.css", frontend / "style.css")
# Disabling this. If outside code adds extra widgets not
# managed by this class, they'll need the server to keep
# running.
# self.cleanups("cleanupServer")(server.close)
return server
def _ensureScene(self) -> ks.ProxyScene:
mbody = self.mbody
if not mbody.getScene():
# Scene doesn't already exist, so we will create it
scene = ks.ProxyScene("webui_auto_scene", mbody.virtualRoot())
# Add a callback to cleanup the Scene we created
@self.cleanups("clearScene")
def clearScene():
mbody.setScene(None)
kc.discard(scene)
# Add the new scene to the multibody
mbody.setScene(scene)
return mbody.getScene()
def _ensureGraphics(self) -> ks.WebScene:
# Scan for an existing WebScene instances
for client in self.scene.clientScenes():
if isinstance(client, ks.WebScene):
break
else:
# No existing WebScene, so we create one
client = ks.WebScene("webui_auto_web_scene", kw.Server(port=0))
# Add the WebScene as a client of ProxyScene
self.scene.registerClientScene(client, self.mbody.virtualRoot())
scene = self.scene
@self.cleanups("clearGraphics")
def clearGraphics():
scene.unregisterClientScene(client)
kc.discard(client)
def onPick(part: ks.WebScenePart):
proxy_part = self.scene.lookupProxyFromImpl(part)
frame = proxy_part.ancestorFrame()
self.selection.set(frame.id())
client.setOnPick(onPick)
def _update(id: int):
base_container = kc.BaseContainer.singleton()
try:
item = base_container.at(id)
except IndexError:
return
if not isinstance(item, kf.Frame):
# At the moment we only know what to do for frames
return
# Remove the outline on the previous selection
for part in self.outline_parts:
part.removeOutline()
# Outline parts attached to the selected frame
self.outline_parts = []
for proxy_node in self.scene.getNodesAttachedToFrame(item):
if not isinstance(proxy_node, ks.ProxyScenePart):
continue
part = proxy_node.graphics(self.graphics)
part.outline()
self.outline_parts.append(part)
self.selection.onChange(_update)
return client
def _setupVisjs(self) -> vjs.MultibodyGraphServer:
selection = self.selection
class ServerWithSelectCallback(vjs.MultibodyGraphServer):
def onClickNode(self, client_id: int, node_id: int | str):
selection.set(node_id)
server = ServerWithSelectCallback(
subgraph=self.mbody,
label_map=None,
buttons=None,
extra_edges=None,
port=0,
title="Multibody System",
)
selection.onChange(server.setSelection)
# Add a callback to cleanup the visjs web server
@self.cleanups("cleanupVisjsServer")
def cleanupVisjsServer():
server.close()
return server
def _setupTreeView(self) -> kw.TreeView:
def bodyToTvNode(body: kd.PhysicalBody) -> kw.TreeView.Node:
tv_node = kw.TreeView.Node(
id=body.id(),
label=body.name(),
)
if parent := body.physicalParentBody():
tv_node.parent = parent.id()
return tv_node
tv_nodes = [
kw.TreeView.Node(
id=self.mbody.id(),
label=self.mbody.name(),
)
]
bodies = self.mbody.sortedPhysicalBodiesList()
tv_nodes.extend(bodyToTvNode(body) for body in bodies)
return kw.TreeView(self.router, tv_nodes, selection=self.selection)
def _setupInfoBox(self) -> InfoPanel:
info_box = InfoPanel(self.router, self.mbody)
# Register cleanup callback
self.cleanups("cleanupInfoBox")(info_box.close)
def _update(id):
base_container = kc.BaseContainer.singleton()
try:
item = base_container.at(id)
except IndexError:
return
info_box.updateFor(item)
self.selection.onChange(_update)
return info_box
[docs]
def close(self):
"""Idempotently close the GUI and cleanup created objects."""
self.cleanups.execute()
[docs]
def __del__(self):
"""Idempotently close the GUI and cleanup created objects."""
self.close()