Source code for Karana.KUtils.MultibodyTUI.swing

from dataclasses import dataclass, replace
from typing import Literal
import math

import numpy as np

from Karana.Dynamics import (
    Multibody,
    StatePropagator,
    PhysicalSubhinge,
    PhysicalHinge,
    SPSOLVER_TYPE,
    TimedEvent,
    LoopConstraintConVel,
)
from Karana.Math import IntegratorType

from .dialog import MenuEntry, MenuTUI, ChoiceTUI, ChoiceData, EntryDialog
from .notify import Notifier
from . import terminal as term

SwingModeOptions = Literal["simple_ik", "no_ik", "udot_pulse"]


[docs] @dataclass class SwingConfig: mode: SwingModeOptions = "udot_pulse" amplitude: float = 0.4 step_size: float = 0.1 deltaT: float = 6.0 render: bool = False
[docs] class SwingConfigMenuTUI(MenuTUI): """The top-level menu for configuring swing settings""" def __init__(self, init_config: SwingConfig | None = None): if init_config: # Make a copy we can safely modify self._config = replace(init_config) else: self._config = SwingConfig() self._modified = False self._confirmed = False self._mode_entry = MenuEntry( callback=self._handle_mode, brief=f"Swing mode [{self._config.mode}]", description="Type of swing (press enter for details)", ) self._amplitude_entry = MenuEntry( callback=self._handle_amplitude, brief=f"Amplitude [{self._config.amplitude}]", description="Amplitude of the swing", ) self._step_size_entry = MenuEntry( callback=self._handle_step_size, brief=f"Step size [{self._config.step_size}]", description="Step size of the swing (only in pulse_udot mode)", ) self._deltaT_entry = MenuEntry( callback=self._handle_deltaT, brief=f"DeltaT [{self._config.deltaT}]", description="Period of the swing (only in pulse_udot mode)", ) self._render_entry = MenuEntry( callback=self._handle_render, brief=f"Render images [{self._config.render}]", description="Whether to render the visualization to a series of images files (only in pulse_udot mode)", ) super().__init__( header=[ "Swing configuration menu", " enter: edit/confirm option", " q / Ctrl-C: cancel", " ↓ / j: move down", " ↑ / k: move up", " 1-7: jump to choice", ], entries=[ self._mode_entry, self._amplitude_entry, self._step_size_entry, self._deltaT_entry, self._render_entry, MenuEntry( callback=self._handle_cancel, brief="Cancel", done=True, ), MenuEntry( callback=self._handle_submit, brief=term.bold("Apply changes"), done=True, ), ], ) @property def config(self) -> SwingConfig | None: if not self._confirmed: return None return replace(self._config) def _handle_mode(self): tui = SwingModeTUI(mode=self._config.mode) if choice := tui.choice: mode = choice.value self._modified = True self._config.mode = mode self._mode_entry.brief = f"Swing mode [{mode}*]" def _handle_amplitude(self): tui = AmplitudeDialog(amplitude=self._config.amplitude) entry = tui.run() if entry is None: return self._modified = True amplitude = float(entry) self._config.amplitude = amplitude self._amplitude_entry.brief = f"Amplitude [{amplitude}*]" def _handle_step_size(self): tui = StepSizeDialog(step_size=self._config.step_size) entry = tui.run() if entry is None: return self._modified = True step_size = float(entry) self._config.step_size = step_size self._step_size_entry.brief = f"Step size [{step_size}*]" def _handle_deltaT(self): tui = DeltaTDialog(deltaT=self._config.deltaT) entry = tui.run() if entry is None: return self._modified = True deltaT = float(entry) self._config.deltaT = deltaT self._deltaT_entry.brief = f"DeltaT [{deltaT}*]" def _handle_render(self): self._modified = True render = not self._config.render self._config.render = render self._render_entry.brief = f"Render images [{render}*]" def _handle_cancel(self): pass def _handle_submit(self): self._confirmed = True
[docs] def footer(self) -> list[str]: callback = self.current.callback if callback == self._handle_cancel: if self._modified: return [term.error("Press enter to discard changes")] else: return [] if callback == self._handle_submit: if self._modified: return [term.info("Press enter to apply changes")] else: return [] return [term.info("Press enter to modify")]
[docs] class SwingModeTUI(ChoiceTUI): def __init__(self, mode: SwingModeOptions): choices = [ ChoiceData( brief="Kinematic pulse (udot_pulse)", description="Do a kinematic sim, pulsing acceleration to drive the hinge", value="udot_pulse", ), ChoiceData( brief="Simple (simple_ik)", description="Continually set the hinge coordinates and do IK to solve constraints", value="simple_ik", ), ChoiceData( brief="Articulate without IK (no_ik)", description="Continually set the hinge coordinates without doing IK", value="no_ik", ), ] # Start with the current mode selected index = 0 for i, choice in enumerate(choices): if mode == choice.value: index = i super().__init__( header=[ "Select swing mode", " enter: confirm", " q / Ctrl-C: cancel", " ↓ / j: move down", " ↑ / k: move up", " 1-4: jump to choice", ], choices=choices, index=index, )
[docs] class AmplitudeDialog(EntryDialog): def __init__(self, amplitude): entry = str(amplitude) super().__init__(prompt=["Enter the new swing amplitude (Ctrl-C to abort):"], entry=entry)
[docs] def is_valid(self) -> bool: try: value = float(self.entry) except ValueError: return False if not math.isfinite(value): return False return True
[docs] def footer(self) -> list[str]: if not self.entry: return ["Please type a number"] if not self.is_valid(): return [term.warn("Invalid amplitude - must be a number")] return ["Press enter to confirm"]
[docs] class StepSizeDialog(EntryDialog): def __init__(self, step_size): entry = str(step_size) super().__init__( prompt=["Enter the new swing step size in seconds (Ctrl-C to abort):"], entry=entry )
[docs] def is_valid(self) -> bool: try: value = float(self.entry) except ValueError: return False if value <= 0: return False if not math.isfinite(value): return False return True
[docs] def footer(self) -> list[str]: if not self.entry: return ["Please type a positive number"] if not self.is_valid(): return [term.warn("Invalid step size - must be a positive number")] return ["Press enter to confirm"]
[docs] class DeltaTDialog(EntryDialog): def __init__(self, deltaT): entry = str(deltaT) super().__init__(prompt=["Enter the new swing deltaT (Ctrl-C to abort):"], entry=entry)
[docs] def is_valid(self) -> bool: try: value = float(self.entry) except ValueError: return False if value <= 0: return False if not math.isfinite(value): return False return True
[docs] def footer(self) -> list[str]: if not self.entry: return ["Please type a positive number"] if not self.is_valid(): return [term.warn("Invalid deltaT - must be a positive number")] return ["Press enter to confirm"]
[docs] class KinMotion: def __init__(self, multibody: Multibody): self.multibody = multibody self.x_init = self.multibody.dynamicsToState() self.deltaT = 6.0 self.pulseT = self.deltaT / 8.0 # the driven_subhinge is normally a subhinge, but a loop # constraint for convels self.driven_subhinge = None # a negative subhinge index implies that we are driving a convel # loop constraint self.subhinge_index = 0 self.move_start_t = None self.A = None # flag to control whether to render images for a movie self.render = False self.gsm = None # FIXME: this state_propagator and the registered models # are preventing a proper cleanup self.state_propagator = self._setup_prop() # Add model to update plotting self.dash_app = self.getDashApp() from Karana.Models import DataPlotter dp = DataPlotter("dataplotter", self.state_propagator) dp.params.dash_app = self.dash_app h = 0.1 dp.setPeriod(h) def _ensureRendering(self, render: bool): # Do nothing if the gsm model wasn't created (eg: no scene) if not self.gsm: if scene := self.multibody.getScene(): # First time, create model and register from Karana.Models import GraphicalSceneMovie self.gsm = GraphicalSceneMovie("swing", self.state_propagator, scene) self.gsm.params.dir = "movie" self.gsm.params.filename_prefix = "image" self.gsm.setPeriod(1.0 / 60.0) self.render = True # Early return, since we don't want any other logic the first time # or if the gsm doesn't exist return # Handle edge going from False to True - register gsm if render and not self.render: self.render = True self.state_propagator.registerModel(self.gsm) # Handle edge going from True to False - unregister gsm elif not render and self.render: self.render = False self.state_propagator.unregisterModel(self.gsm)
[docs] def getDashApp(self): def time(): return 1e-9 * float(self.state_propagator.getTime()) def xx(): print("In xx", time()) return time() * 1.1 from Karana.KUtils.DataPlotter import PlotDataDS, DashApp multibody = self.multibody """ shg = multibody.getBody("part_TR_Steering.ges_steering_wheel_90").parentHinge().subhinge(0) pdstwheel = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Steering.ges_rack_89").parentHinge().subhinge(0) pdrack = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0) pdwheel = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = ( multibody.getBody("part_TR_Front_Suspension.gel_lower_control_arm2_18") .parentHinge() .subhinge(0) ) pdwheel2 = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0) pdwheel1 = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Rear_Suspension.ger_spindle_80").parentHinge().subhinge(0) pdspindle = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = ( multibody.getBody("part_TR_Rear_Suspension.ger_drive_shaft_82") .parentHinge() .subhinge(0) ) pdshaft = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) shg = multibody.getBody("part_TR_Powertrain.ger_diff_output_122").parentHinge().subhinge(0) pddiff = PlotDataDS( x_data_name="t", x_data=time, y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot}, title=shg.name(), ) dashapp = DashApp(data=[pdstwheel, pdrack, pdwheel, pddiff, pdshaft, pdspindle]) """ dashapp = DashApp(data=[]) return dashapp
[docs] def move( self, subhinge: PhysicalSubhinge, amplitude: float, deltaT: float, render: bool, subhinge_index: int, step_size: float, ): cks = self.multibody.cks() worked = False h = np.timedelta64(int(1e9 * step_size), "ns") te = TimedEvent("hop_size", h + self.state_propagator.getTime(), lambda _: None, False) te.period = h try: self.state_propagator.registerTimedEvent(te) self.driven_subhinge = subhinge self.subhinge_index = subhinge_index self.move_start_t = 1e-9 * float(self.state_propagator.getTime()) self.A = amplitude self.deltaT = deltaT self.pulseT = self.deltaT / 8.0 self._ensureRendering(render) # prior = self.multibody.subhingeCoordData().frozenCoordsForKin() if subhinge_index >= 0: cks.ensureCurrent() cks.freezeCoord(subhinge, subhinge_index) self.state_propagator.advanceBy(self.deltaT + self.pulseT) if subhinge_index >= 0: cks.unfreezeCoord(subhinge, subhinge_index) worked = True finally: tnew = self.state_propagator.getTime() # self.state_propagator.setState(tnew, self.x_init) self.state_propagator.setTime(tnew) self.state_propagator.setState(self.x_init) self.state_propagator.unregisterTimedEvent(te.name(), te.isBeforeHop()) self.multibody.stateToDynamics(self.x_init) # in case there was an exception, unfreeze the subhinge coordinate if not worked and subhinge_index >= 0: cks.unfreezeCoord(subhinge, subhinge_index) if scene := self.multibody.getScene(): scene.update()
def _get_udot(self, sp_t): """ Return the (u, udot) value for the current time for this profile """ t = 1e-9 * float(sp_t) - self.move_start_t a = self.A * 0.2 if t >= 0 and t < self.pulseT: # initial acceleration to go forward udot = a u = a * t elif t >= self.pulseT and t < (self.deltaT / 2 - self.pulseT / 2): # coasting forward till start of slow down and reversal udot = 0 u = a * self.pulseT elif t >= self.deltaT / 2 - self.pulseT / 2.0 and t < (self.deltaT / 2 + self.pulseT / 2): # ramp down to start going backwards udot = -2 * a u = a * self.pulseT - a * (t - self.deltaT / 2 - self.pulseT / 2.0) elif t >= (self.deltaT / 2 + self.pulseT / 2) and t < (self.deltaT - self.pulseT): # coasting backward till start of slow down to rest udot = 0 u = -a * self.pulseT elif t >= self.deltaT - self.pulseT and t <= self.deltaT: # decelerate to come to a stop udot = a u = -a * (self.deltaT - t) else: # udot = 0 u = 0 # print("JJJ", t, u, udot) return u, udot def _pre_deriv(self, t, _): u, udot = self._get_udot(t) delt = 1e-9 * float(t) - self.move_start_t if self.subhinge_index >= 0: # regular non-convel subhinge udotvec = np.zeros(self.driven_subhinge.nU()) udotvec[self.subhinge_index] = udot self.driven_subhinge.setUdot(udotvec) else: # driving a convel constraint self.driven_subhinge.setInput(u, udot) def _setup_prop(self) -> StatePropagator: from Karana.Models import ( ProjectConstraintError, UpdateProxyScene, SyncRealTime, TimeDisplay, ) sp = StatePropagator( self.multibody, IntegratorType.EULER, None, None, SPSOLVER_TYPE.KINEMATICS ) # sp.setKinematicsMode(True) sp.getOptions().fns.pre_deriv_fns["set_udot"] = self._pre_deriv t_init = np.timedelta64(0, "ns") # sp.setState(t_init, self.x_init) sp.setTime(t_init) sp.setState(self.x_init) if scene := self.multibody.getScene(): # Add model to update the scene model = UpdateProxyScene("update_scene", sp, scene) # Add model to limit speed model = SyncRealTime("sync_time", sp) # Add model to show sim time model = TimeDisplay("sim_time", sp, scene.graphics()) # Add model to fix constraint violation errors # FIXME: multibody.cks() fails if there are no constraints try: cks = self.multibody.cks() except ValueError: pass else: model = ProjectConstraintError( "sd", sp, self.multibody, sp.getIntegrator(), self.multibody.cks() ) model.params.tol = 1e-6 return sp
[docs] class SwingManager: def __init__(self, multibody: Multibody, notify: Notifier): self._multibody = multibody self._kin_motion = KinMotion(multibody) self._notify = notify def _swing_simple_ik(self, hinge, amplitude, coord_offset=0): # get the subhinge and subhinge coordinate offset for the hinge coord_offset sh, subhinge_offset = hinge.coordData().coordAt(coord_offset) print(f"DOING {sh.name()}/{subhinge_offset}") self._notify.info( f"Articulating {sh.name()} subhinge offset={subhinge_offset}. A={amplitude}" ) self._multibody.articulateSubhinge( sh, subhinge_index=subhinge_offset, rangeQ=amplitude, dQ=0.04, pause=0.02, disable_ik=False, ) def _swing_no_ik(self, hinge, amplitude, coord_offset=0): # get the subhinge and subhinge coordinate offset for the hinge coord_offset sh, subhinge_offset = hinge.coordData().coordAt(coord_offset) print(f"DOING {sh.name()}/{subhinge_offset}") self._notify.info( f"Articulating {sh.name()} subhinge offset={subhinge_offset} (no IK). A={amplitude}" ) self._multibody.articulateSubhinge( sh, subhinge_index=subhinge_offset, rangeQ=amplitude, dQ=0.04, pause=0, disable_ik=True, ) def _swing_udot_pulse(self, hinge, amplitude, step_size, deltaT, render, coord_offset=0): try: # get the subhinge and subhinge coordinate offset for the hinge coord_offset sh, subhinge_offset = hinge.coordData().coordAt(coord_offset) # print(f"DOING {sh.name()}/{subhinge_offset}") self._notify.info( f"Articulating {sh.name()} subhinge offset={subhinge_offset} (udot). A={amplitude}" ) self._kin_motion.move( sh, amplitude=amplitude, step_size=step_size, deltaT=deltaT, render=render, subhinge_index=subhinge_offset, ) except ValueError as err: self._notify.error(f"Motion failed: {err}")
[docs] def swingHinge( self, hinge: PhysicalHinge, mode: SwingModeOptions, amplitude: float, step_size: float, deltaT: float, render: bool, coord_offset: int, ): if coord_offset >= hinge.coordData().nU(): self._notify.error( f"The requested hinge coordinate {coord_offset} must be less than {hinge.coordData().nU()} - the number of hinge coordinates." ) return if mode == "simple_ik": self._swing_simple_ik(hinge, amplitude=amplitude, coord_offset=coord_offset) elif mode == "no_ik": self._swing_no_ik(hinge, amplitude=amplitude, coord_offset=coord_offset) elif mode == "udot_pulse": self._swing_udot_pulse( hinge, amplitude=amplitude, deltaT=deltaT, coord_offset=coord_offset, render=render, step_size=step_size, ) else: raise NotImplementedError(f"{mode=}")
[docs] def swingConvel( self, lc: LoopConstraintConVel, amplitude: float, deltaT: float, render: bool, step_size: float, ): """ Swing a convel loop constraint """ try: self._notify.info(f"Driving {lc.name()} convel constraint (udot). A={amplitude}") sh = None self._kin_motion.move( lc, amplitude=amplitude, step_size=step_size, deltaT=deltaT, render=render, subhinge_index=-1, ) except ValueError as err: self._notify.error(f"Motion failed: {err}")