Source code for Karana.KUtils.MultibodyTUI.swing

"""Classes and functions used to swing a joint or constraint."""

from dataclasses import dataclass, replace
from typing import Literal
import math

import numpy as np
from weakref import ref

from Karana.Dynamics import (
    Multibody,
    StatePropagator,
    PhysicalSubhinge,
    PhysicalHinge,
    MMSolverType,
    TimedEvent,
    LoopConstraintConVel,
)
from Karana.Math import IntegratorType
from Karana.Core import discard

from .dialog import MenuEntry, MenuTUI, ChoiceTUI, ChoiceData, EntryDialog
from .notify import Notifier
from . import terminal as term

SwingModeOptions = Literal["simple_ik", "no_ik", "udot_pulse"]


[docs] @dataclass class SwingConfig: """Configuration for controlling a swing.""" mode: SwingModeOptions = "udot_pulse" amplitude: float = 0.4 step_size: float = 0.1 delta_t: float = 6.0 render: bool = False
[docs] class SwingConfigMenuTUI(MenuTUI): """The top-level menu for configuring swing settings.""" def __init__(self, init_config: SwingConfig | None = None): """Create a SwingConfigMenuTUI instance. Parameters ---------- init_config : SwingConfig | None The initial config for the swing. """ if init_config: # Make a copy we can safely modify self._config = replace(init_config) else: self._config = SwingConfig() self._modified = False self._confirmed = False self._mode_entry = MenuEntry( callback=self._handleMode, brief=f"Swing mode [{self._config.mode}]", description="Type of swing (press enter for details)", ) self._amplitude_entry = MenuEntry( callback=self._handleAmplitude, brief=f"Amplitude [{self._config.amplitude}]", description="Amplitude of the swing", ) self._step_size_entry = MenuEntry( callback=self._handleStepSize, brief=f"Step size [{self._config.step_size}]", description="Step size of the swing (only in pulse_udot mode)", ) self._delta_t_entry = MenuEntry( callback=self._handleDeltaT, brief=f"DeltaT [{self._config.delta_t}]", description="Period of the swing (only in pulse_udot mode)", ) self._render_entry = MenuEntry( callback=self._handleRender, brief=f"Render images [{self._config.render}]", description="Whether to render the visualization to a series of images files (only in pulse_udot mode)", ) super().__init__( header=[ "Swing configuration menu", " enter: edit/confirm option", " q / Ctrl-C: cancel", " ↓ / j: move down", " ↑ / k: move up", " 1-7: jump to choice", ], entries=[ self._mode_entry, self._amplitude_entry, self._step_size_entry, self._delta_t_entry, self._render_entry, MenuEntry( callback=self._handleCancel, brief="Cancel", done=True, ), MenuEntry( callback=self._handleSubmit, brief=term.bold("Apply changes"), done=True, ), ], ) @property def config(self) -> SwingConfig | None: """Return the config for the swing. Returns ------- SwingConfig | None The swing config. """ if not self._confirmed: return None return replace(self._config) def _handleMode(self): tui = SwingModeTUI(mode=self._config.mode) if choice := tui.choice: mode = choice.value self._modified = True self._config.mode = mode self._mode_entry.brief = f"Swing mode [{mode}*]" def _handleAmplitude(self): tui = AmplitudeDialog(amplitude=self._config.amplitude) entry = tui.run() if entry is None: return self._modified = True amplitude = float(entry) self._config.amplitude = amplitude self._amplitude_entry.brief = f"Amplitude [{amplitude}*]" def _handleStepSize(self): tui = StepSizeDialog(step_size=self._config.step_size) entry = tui.run() if entry is None: return self._modified = True step_size = float(entry) self._config.step_size = step_size self._step_size_entry.brief = f"Step size [{step_size}*]" def _handleDeltaT(self): tui = DeltaTDialog(delta_t=self._config.delta_t) entry = tui.run() if entry is None: return self._modified = True delta_t = float(entry) self._config.delta_t = delta_t self._delta_t_entry.brief = f"DeltaT [{delta_t}*]" def _handleRender(self): self._modified = True render = not self._config.render self._config.render = render self._render_entry.brief = f"Render images [{render}*]" def _handleCancel(self): pass def _handleSubmit(self): self._confirmed = True
[docs] def footer(self) -> list[str]: """Return the footer message to use. Returns ------- list[str] The footer message to use. """ callback = self.current.callback if callback == self._handleCancel: if self._modified: return [term.error("Press enter to discard changes")] else: return [] if callback == self._handleSubmit: if self._modified: return [term.info("Press enter to apply changes")] else: return [] return [term.info("Press enter to modify")]
[docs] class SwingModeTUI(ChoiceTUI): """TUI for controlling the swing mode.""" def __init__(self, mode: SwingModeOptions): """Create an instance of SwingModeTUI. Parameters ---------- mode : SwingModeOptions The options to use for the swing mode. """ choices = [ ChoiceData( brief="Kinematic pulse (udot_pulse)", description="Do a kinematic sim, pulsing acceleration to drive the hinge", value="udot_pulse", ), ChoiceData( brief="Simple (simple_ik)", description="Continually set the hinge coordinates and do IK to solve constraints", value="simple_ik", ), ChoiceData( brief="Articulate without IK (no_ik)", description="Continually set the hinge coordinates without doing IK", value="no_ik", ), ] # Start with the current mode selected index = 0 for i, choice in enumerate(choices): if mode == choice.value: index = i super().__init__( header=[ "Select swing mode", " enter: confirm", " q / Ctrl-C: cancel", " ↓ / j: move down", " ↑ / k: move up", " 1-4: jump to choice", ], choices=choices, index=index, )
[docs] class AmplitudeDialog(EntryDialog): """Dialog for the swing amplitude.""" def __init__(self, amplitude): """Create an instance of AmplitudeDialog. Parameters ---------- amplitude : The amplitude to use. """ entry = str(amplitude) super().__init__(prompt=["Enter the new swing amplitude (Ctrl-C to abort):"], entry=entry)
[docs] def isValid(self) -> bool: """Check if the user entry is a valid amplitude. Returns ------- bool True if this is a valid amplitude, False otherwise. """ try: value = float(self.entry) except ValueError: return False if not math.isfinite(value): return False return True
[docs] def footer(self) -> list[str]: """Return the footer message. Returns ------- list[str] The footer message. """ if not self.entry: return ["Please type a number"] if not self.isValid(): return [term.warn("Invalid amplitude - must be a number")] return ["Press enter to confirm"]
[docs] class StepSizeDialog(EntryDialog): """Dialog for the swing step size.""" def __init__(self, step_size): """Create an instance of StepSizeDialog. Parameters ---------- step_size : The step size to use. """ entry = str(step_size) super().__init__( prompt=["Enter the new swing step size in seconds (Ctrl-C to abort):"], entry=entry )
[docs] def isValid(self) -> bool: """Check if the user entry is a valid step size. Returns ------- bool True if the user entry is a valid step size, False otherwise. """ try: value = float(self.entry) except ValueError: return False if value <= 0: return False if not math.isfinite(value): return False return True
[docs] def footer(self) -> list[str]: """Return the footer message. Returns ------- list[str] The footer message. """ if not self.entry: return ["Please type a positive number"] if not self.isValid(): return [term.warn("Invalid step size - must be a positive number")] return ["Press enter to confirm"]
[docs] class DeltaTDialog(EntryDialog): """Dialog for the swing delta_t.""" def __init__(self, delta_t): """Create an instance of DeltaTDialog. Parameters ---------- delta_t : The delta_t to use. """ entry = str(delta_t) super().__init__(prompt=["Enter the new swing delta_t (Ctrl-C to abort):"], entry=entry)
[docs] def isValid(self) -> bool: """Check if the users delta_t is valid. Returns ------- bool True if the user's delta_t is valid, False otherwise. """ try: value = float(self.entry) except ValueError: return False if value <= 0: return False if not math.isfinite(value): return False return True
[docs] def footer(self) -> list[str]: """Return the footer message. Returns ------- list[str] The footer message. """ if not self.entry: return ["Please type a positive number"] if not self.isValid(): return [term.warn("Invalid delta_t - must be a positive number")] return ["Press enter to confirm"]
[docs] class KinMotion: """Class for performing a kinematic motion on the multibody."""
[docs] def __del__(self): """Delete this instance of KinMotion.""" discard(self.state_propagator)
def __init__(self, multibody: Multibody): """Create an instance of KinMotion. Parameters ---------- multibody : Multibody The Multibody to perform kinematic motions on. """ self.multibody = multibody self.delta_t = 6.0 self.pulse_t = self.delta_t / 8.0 # the driven_subhinge is normally a subhinge, but a loop # constraint for convels self.driven_subhinge = None # a negative subhinge index implies that we are driving a convel # loop constraint self.subhinge_index = 0 self.move_start_t = None self.A = None # flag to control whether to render images for a movie self.render = False self.gsm = None self.state_propagator = self._setupProp() # Add model to update plotting self.dash_app = self.getDashApp() from Karana.Models import DataPlotter dp = DataPlotter("dataplotter", self.state_propagator, self.dash_app) h = 0.1 dp.setPeriod(h) def _ensureRendering(self, render: bool): # Do nothing if the gsm model wasn't created (e.g., no scene) if not self.gsm: if scene := self.multibody.getScene(): # First time, create model and register from Karana.Models import GraphicalSceneMovie self.gsm = GraphicalSceneMovie("swing", self.state_propagator, scene) self.gsm.params.dir = "movie" self.gsm.params.filename_prefix = "image" self.gsm.setPeriod(1.0 / 60.0) self.render = True # Early return, since we don't want any other logic the first time # or if the gsm doesn't exist return # Handle edge going from False to True - register gsm if render and not self.render: self.render = True self.state_propagator.registerModel(self.gsm) # Handle edge going from True to False - unregister gsm elif not render and self.render: self.render = False self.state_propagator.unregisterModel(self.gsm)
[docs] def getDashApp(self): """Get the DashApp. Returns ------- The DashApp. """ def time(): return 1e-9 * float(self.state_propagator.getTime()) def xx(): print("In xx", time()) return time() * 1.1 from Karana.KUtils.DataPlotter import DashApp multibody = self.multibody """ shg = multibody.getBody("part_TR_Steering.ges_steering_wheel_90").parentHinge().subhinge(0) pdstwheel = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Steering.ges_rack_89").parentHinge().subhinge(0) pdrack = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0) pdwheel = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = ( multibody.getBody("part_TR_Front_Suspension.gel_lower_control_arm2_18") .parentHinge() .subhinge(0) ) pdwheel2 = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0) pdwheel1 = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Rear_Suspension.ger_spindle_80").parentHinge().subhinge(0) pdspindle = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = ( multibody.getBody("part_TR_Rear_Suspension.ger_drive_shaft_82") .parentHinge() .subhinge(0) ) pdshaft = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Powertrain.ger_diff_output_122").parentHinge().subhinge(0) pddiff = SinglePlotData( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) dashapp = DashApp(data=[pdstwheel, pdrack, pdwheel, pddiff, pdshaft, pdspindle]) """ import os if os.getenv("DTEST_RUNNING", False): # Use port 0 if running with dtest dashapp = DashApp(data=[], port=0) else: dashapp = DashApp(data=[]) return dashapp
[docs] def move( self, subhinge: PhysicalSubhinge, amplitude: float, delta_t: float, render: bool, subhinge_index: int, step_size: float, ): """Move a subhinge through a motion. Parameters ---------- subhinge : PhysicalSubhinge The subhinge to move. amplitude : float The amplitude of the motion. delta_t : float The time it takes the motion to complete. render : bool Whether to render the motion or not. subhinge_index : int The index of the subhinge. step_size : float The step size to use for the motion. """ cks = self.multibody.cks() worked = False h = np.timedelta64(int(1e9 * step_size), "ns") te = TimedEvent("hop_size", h + self.state_propagator.getTime(), lambda _: None, False) te.period = h try: self.state_propagator.registerTimedEvent(te) self.driven_subhinge = subhinge self.subhinge_index = subhinge_index self.move_start_t = 1e-9 * float(self.state_propagator.getTime()) self.A = amplitude self.delta_t = delta_t self.pulse_t = self.delta_t / 8.0 self._ensureRendering(render) # prior = self.multibody.subhingeCoordData().frozenCoordsForKin() if subhinge_index >= 0: cks.ensureHealthy() cks.freezeCoord(subhinge, subhinge_index) self.state_propagator.advanceBy(self.delta_t + self.pulse_t) if subhinge_index >= 0: cks.unfreezeCoord(subhinge, subhinge_index) worked = True finally: tnew = self.state_propagator.getTime() # self.state_propagator.setState(tnew, self.x_init) self.state_propagator.setTime(tnew) self.state_propagator.setState(self.x_init) self.state_propagator.unregisterTimedEvent(te.name(), te.isBeforeHop()) # in case there was an exception, unfreeze the subhinge coordinate if not worked and subhinge_index >= 0: cks.unfreezeCoord(subhinge, subhinge_index) if scene := self.multibody.getScene(): scene.update()
def _getUdot(self, sp_t): """Return the (u, udot) value for the current time for this profile.""" t = 1e-9 * float(sp_t) - self.move_start_t a = self.A * 0.2 if t >= 0 and t < self.pulse_t: # initial acceleration to go forward udot = a u = a * t elif t >= self.pulse_t and t < (self.delta_t / 2 - self.pulse_t / 2): # coasting forward till start of slow down and reversal udot = 0 u = a * self.pulse_t elif t >= self.delta_t / 2 - self.pulse_t / 2.0 and t < ( self.delta_t / 2 + self.pulse_t / 2 ): # ramp down to start going backwards udot = -2 * a u = a * self.pulse_t - a * (t - self.delta_t / 2 - self.pulse_t / 2.0) elif t >= (self.delta_t / 2 + self.pulse_t / 2) and t < (self.delta_t - self.pulse_t): # coasting backward till start of slow down to rest udot = 0 u = -a * self.pulse_t elif t >= self.delta_t - self.pulse_t and t <= self.delta_t: # decelerate to come to a stop udot = a u = -a * (self.delta_t - t) else: # udot = 0 u = 0 # print("JJJ", t, u, udot) return u, udot def _preDeriv(self, t, _): u, udot = self._getUdot(t) delt = 1e-9 * float(t) - self.move_start_t if self.subhinge_index >= 0: # regular non-convel subhinge udotvec = np.zeros(self.driven_subhinge.nU()) udotvec[self.subhinge_index] = udot self.driven_subhinge.setUdot(udotvec) else: # driving a convel constraint self.driven_subhinge.setInput(u, udot) def _setupProp(self) -> StatePropagator: from Karana.Models import ( ProjectConstraintError, UpdateProxyScene, SyncRealTime, TimeDisplay, ) sp = StatePropagator( self.multibody, IntegratorType.EULER, None, None, MMSolverType.KINEMATICS ) # sp.setKinematicsMode(True) # preDeriv function with a weakref to avoid circular references weak = ref(self) def preDeriv(t, x): v = weak() if v is not None: v._preDeriv(t, x) sp.fns.pre_deriv_fns["set_udot"] = preDeriv t_init = np.timedelta64(0, "ns") # sp.setState(t_init, self.x_init) sp.setTime(t_init) self.x_init = sp.assembleState() sp.setState(self.x_init) if scene := self.multibody.getScene(): # Add model to update the scene model = UpdateProxyScene("update_scene", sp, scene) # Add model to limit speed model = SyncRealTime("sync_time", sp) # Add model to show sim time model = TimeDisplay("sim_time", sp, scene.graphics()) # Add model to fix constraint violation errors # Only do this if the multibody has constraints if len(self.multibody.enabledConstraints()) > 0: model = ProjectConstraintError( "sd", sp, self.multibody, sp.getIntegrator(), self.multibody.cks() ) model.params.tol = 1e-6 return sp
[docs] class SwingManager: """Manager for swing motions.""" def __init__(self, multibody: Multibody, notify: Notifier): """Create an instance of SwingManager. Parameters ---------- multibody : Multibody The multibody to manage swings for. notify : Notifier The notifier to send messages to. """ self._multibody = multibody self._kin_motion = KinMotion(multibody) self._notify = notify def _swingSimpleIk(self, hinge, amplitude, coord_offset=0): # get the subhinge and subhinge coordinate offset for the hinge coord_offset sh, subhinge_offset = hinge.coordData().coordAt(coord_offset) print(f"DOING {sh.name()}/{subhinge_offset}") self._notify.info( f"Articulating {sh.name()} subhinge offset={subhinge_offset}. A={amplitude}" ) self._multibody.articulateSubhinge( sh, subhinge_index=subhinge_offset, rangeQ=amplitude, dQ=0.04, pause=0.02, disable_ik=False, ) def _swingNoIk(self, hinge, amplitude, coord_offset=0): # get the subhinge and subhinge coordinate offset for the hinge coord_offset sh, subhinge_offset = hinge.coordData().coordAt(coord_offset) print(f"DOING {sh.name()}/{subhinge_offset}") self._notify.info( f"Articulating {sh.name()} subhinge offset={subhinge_offset} (no IK). A={amplitude}" ) self._multibody.articulateSubhinge( sh, subhinge_index=subhinge_offset, rangeQ=amplitude, dQ=0.04, pause=0, disable_ik=True, ) def _swingUdotPulse(self, hinge, amplitude, step_size, delta_t, render, coord_offset=0): try: # get the subhinge and subhinge coordinate offset for the hinge coord_offset sh, subhinge_offset = hinge.coordData().coordAt(coord_offset) # print(f"DOING {sh.name()}/{subhinge_offset}") self._notify.info( f"Articulating {sh.name()} subhinge offset={subhinge_offset} (udot). A={amplitude}" ) self._kin_motion.move( sh, amplitude=amplitude, step_size=step_size, delta_t=delta_t, render=render, subhinge_index=subhinge_offset, ) except ValueError as err: self._notify.error(f"Motion failed: {err}")
[docs] def swingHinge( self, hinge: PhysicalHinge, mode: SwingModeOptions, amplitude: float, step_size: float, delta_t: float, render: bool, coord_offset: int, ): """Swing a hinge. Parameters ---------- hinge : PhysicalHinge The hinge to swing. mode : SwingModeOptions The mode used for the swing. amplitude : float The amplitude of the swing. step_size : float The step size for the swing. delta_t : float The time taken to complete the swing. render : bool Whether to render the swing or not. coord_offset : int The coordinate offset of the value changing during the swing. """ if coord_offset >= hinge.coordData().nU(): self._notify.error( f"The requested hinge coordinate {coord_offset} must be less than {hinge.coordData().nU()} - the number of hinge coordinates." ) return if mode == "simple_ik": self._swingSimpleIk(hinge, amplitude=amplitude, coord_offset=coord_offset) elif mode == "no_ik": self._swingNoIk(hinge, amplitude=amplitude, coord_offset=coord_offset) elif mode == "udot_pulse": self._swingUdotPulse( hinge, amplitude=amplitude, delta_t=delta_t, coord_offset=coord_offset, render=render, step_size=step_size, ) else: raise NotImplementedError(f"{mode=}")
[docs] def swingConvel( self, lc: LoopConstraintConVel, amplitude: float, delta_t: float, render: bool, step_size: float, ): """Swing a convel loop constraint.""" try: self._notify.info(f"Driving {lc.name()} convel constraint (udot). A={amplitude}") sh = None self._kin_motion.move( lc, amplitude=amplitude, step_size=step_size, delta_t=delta_t, render=render, subhinge_index=-1, ) except ValueError as err: self._notify.error(f"Motion failed: {err}")