"""Classes and functions used to swing a joint or constraint."""
from dataclasses import dataclass, replace
from typing import Literal
import math
import numpy as np
from Karana.Dynamics import (
Multibody,
StatePropagator,
PhysicalSubhinge,
PhysicalHinge,
SpSolverType,
TimedEvent,
LoopConstraintConVel,
)
from Karana.Math import IntegratorType
from .dialog import MenuEntry, MenuTUI, ChoiceTUI, ChoiceData, EntryDialog
from .notify import Notifier
from . import terminal as term
SwingModeOptions = Literal["simple_ik", "no_ik", "udot_pulse"]
[docs]
@dataclass
class SwingConfig:
"""Configuration for controlling a swing."""
mode: SwingModeOptions = "udot_pulse"
amplitude: float = 0.4
step_size: float = 0.1
delta_t: float = 6.0
render: bool = False
[docs]
class SwingModeTUI(ChoiceTUI):
"""TUI for controlling the swing mode."""
def __init__(self, mode: SwingModeOptions):
"""Create an instance of SwingModeTUI.
Parameters
----------
mode : SwingModeOptions
The options to use for the swing mode.
"""
choices = [
ChoiceData(
brief="Kinematic pulse (udot_pulse)",
description="Do a kinematic sim, pulsing acceleration to drive the hinge",
value="udot_pulse",
),
ChoiceData(
brief="Simple (simple_ik)",
description="Continually set the hinge coordinates and do IK to solve constraints",
value="simple_ik",
),
ChoiceData(
brief="Articulate without IK (no_ik)",
description="Continually set the hinge coordinates without doing IK",
value="no_ik",
),
]
# Start with the current mode selected
index = 0
for i, choice in enumerate(choices):
if mode == choice.value:
index = i
super().__init__(
header=[
"Select swing mode",
" enter: confirm",
" q / Ctrl-C: cancel",
" ↓ / j: move down",
" ↑ / k: move up",
" 1-4: jump to choice",
],
choices=choices,
index=index,
)
[docs]
class AmplitudeDialog(EntryDialog):
"""Dialog for the swing amplitude."""
def __init__(self, amplitude):
"""Create an instance of AmplitudeDialog.
Parameters
----------
amplitude :
The amplitude to use.
"""
entry = str(amplitude)
super().__init__(prompt=["Enter the new swing amplitude (Ctrl-C to abort):"], entry=entry)
[docs]
def isValid(self) -> bool:
"""Check if the user entry is a valid amplitude.
Returns
-------
bool
True if this is a valid amplitude, False otherwise.
"""
try:
value = float(self.entry)
except ValueError:
return False
if not math.isfinite(value):
return False
return True
[docs]
class StepSizeDialog(EntryDialog):
"""Dialog for the swing step size."""
def __init__(self, step_size):
"""Create an instance of StepSizeDialog.
Parameters
----------
step_size :
The step size to use.
"""
entry = str(step_size)
super().__init__(
prompt=["Enter the new swing step size in seconds (Ctrl-C to abort):"], entry=entry
)
[docs]
def isValid(self) -> bool:
"""Check if the user entry is a valid step size.
Returns
-------
bool
True if the user entry is a valid step size, False otherwise.
"""
try:
value = float(self.entry)
except ValueError:
return False
if value <= 0:
return False
if not math.isfinite(value):
return False
return True
[docs]
class DeltaTDialog(EntryDialog):
"""Dialog for the swing delta_t."""
def __init__(self, delta_t):
"""Create an instance of DeltaTDialog.
Parameters
----------
delta_t :
The delta_t to use.
"""
entry = str(delta_t)
super().__init__(prompt=["Enter the new swing delta_t (Ctrl-C to abort):"], entry=entry)
[docs]
def isValid(self) -> bool:
"""Check if the users delta_t is valid.
Returns
-------
bool
True if the user's delta_t is valid, False otherwise.
"""
try:
value = float(self.entry)
except ValueError:
return False
if value <= 0:
return False
if not math.isfinite(value):
return False
return True
[docs]
class KinMotion:
"""Class for performing a kinematic motion on the multibody."""
def __init__(self, multibody: Multibody):
"""Create an instance of KinMotion.
Parameters
----------
multibody : Multibody
The Multibody to perform kinematic motions on.
"""
self.multibody = multibody
self.delta_t = 6.0
self.pulse_t = self.delta_t / 8.0
# the driven_subhinge is normally a subhinge, but a loop
# constraint for convels
self.driven_subhinge = None
# a negative subhinge index implies that we are driving a convel
# loop constraint
self.subhinge_index = 0
self.move_start_t = None
self.A = None
# flag to control whether to render images for a movie
self.render = False
self.gsm = None
self.state_propagator = self._setupProp()
# Add model to update plotting
self.dash_app = self.getDashApp()
from Karana.Models import DataPlotter
dp = DataPlotter("dataplotter", self.state_propagator)
dp.params.dash_app = self.dash_app
h = 0.1
dp.setPeriod(h)
def _ensureRendering(self, render: bool):
# Do nothing if the gsm model wasn't created (e.g., no scene)
if not self.gsm:
if scene := self.multibody.getScene():
# First time, create model and register
from Karana.Models import GraphicalSceneMovie
self.gsm = GraphicalSceneMovie("swing", self.state_propagator, scene)
self.gsm.params.dir = "movie"
self.gsm.params.filename_prefix = "image"
self.gsm.setPeriod(1.0 / 60.0)
self.render = True
# Early return, since we don't want any other logic the first time
# or if the gsm doesn't exist
return
# Handle edge going from False to True - register gsm
if render and not self.render:
self.render = True
self.state_propagator.registerModel(self.gsm)
# Handle edge going from True to False - unregister gsm
elif not render and self.render:
self.render = False
self.state_propagator.unregisterModel(self.gsm)
[docs]
def getDashApp(self):
"""Get the DashApp.
Returns
-------
The DashApp.
"""
def time():
return 1e-9 * float(self.state_propagator.getTime())
def xx():
print("In xx", time())
return time() * 1.1
from Karana.KUtils.DataPlotter import PlotDataDS, DashApp
multibody = self.multibody
"""
shg = multibody.getBody("part_TR_Steering.ges_steering_wheel_90").parentHinge().subhinge(0)
pdstwheel = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Steering.ges_rack_89").parentHinge().subhinge(0)
pdrack = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0)
pdwheel = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = (
multibody.getBody("part_TR_Front_Suspension.gel_lower_control_arm2_18")
.parentHinge()
.subhinge(0)
)
pdwheel2 = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0)
pdwheel1 = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Rear_Suspension.ger_spindle_80").parentHinge().subhinge(0)
pdspindle = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = (
multibody.getBody("part_TR_Rear_Suspension.ger_drive_shaft_82")
.parentHinge()
.subhinge(0)
)
pdshaft = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Powertrain.ger_diff_output_122").parentHinge().subhinge(0)
pddiff = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
dashapp = DashApp(data=[pdstwheel, pdrack, pdwheel, pddiff, pdshaft, pdspindle])
"""
import os
if os.getenv("DTEST_RUNNING", False):
# Use port 0 if running with dtest
dashapp = DashApp(data=[], port=0)
else:
dashapp = DashApp(data=[])
return dashapp
[docs]
def move(
self,
subhinge: PhysicalSubhinge,
amplitude: float,
delta_t: float,
render: bool,
subhinge_index: int,
step_size: float,
):
"""Move a subhinge through a motion.
Parameters
----------
subhinge : PhysicalSubhinge
The subhinge to move.
amplitude : float
The amplitude of the motion.
delta_t : float
The time it takes the motion to complete.
render : bool
Whether to render the motion or not.
subhinge_index : int
The index of the subhinge.
step_size : float
The step size to use for the motion.
"""
cks = self.multibody.cks()
worked = False
h = np.timedelta64(int(1e9 * step_size), "ns")
te = TimedEvent("hop_size", h + self.state_propagator.getTime(), lambda _: None, False)
te.period = h
try:
self.state_propagator.registerTimedEvent(te)
self.driven_subhinge = subhinge
self.subhinge_index = subhinge_index
self.move_start_t = 1e-9 * float(self.state_propagator.getTime())
self.A = amplitude
self.delta_t = delta_t
self.pulse_t = self.delta_t / 8.0
self._ensureRendering(render)
# prior = self.multibody.subhingeCoordData().frozenCoordsForKin()
if subhinge_index >= 0:
cks.ensureCurrent()
cks.freezeCoord(subhinge, subhinge_index)
self.state_propagator.advanceBy(self.delta_t + self.pulse_t)
if subhinge_index >= 0:
cks.unfreezeCoord(subhinge, subhinge_index)
worked = True
finally:
tnew = self.state_propagator.getTime()
# self.state_propagator.setState(tnew, self.x_init)
self.state_propagator.setTime(tnew)
self.state_propagator.setState(self.x_init)
self.state_propagator.unregisterTimedEvent(te.name(), te.isBeforeHop())
# in case there was an exception, unfreeze the subhinge coordinate
if not worked and subhinge_index >= 0:
cks.unfreezeCoord(subhinge, subhinge_index)
if scene := self.multibody.getScene():
scene.update()
def _getUdot(self, sp_t):
"""Return the (u, udot) value for the current time for this profile."""
t = 1e-9 * float(sp_t) - self.move_start_t
a = self.A * 0.2
if t >= 0 and t < self.pulse_t:
# initial acceleration to go forward
udot = a
u = a * t
elif t >= self.pulse_t and t < (self.delta_t / 2 - self.pulse_t / 2):
# coasting forward till start of slow down and reversal
udot = 0
u = a * self.pulse_t
elif t >= self.delta_t / 2 - self.pulse_t / 2.0 and t < (
self.delta_t / 2 + self.pulse_t / 2
):
# ramp down to start going backwards
udot = -2 * a
u = a * self.pulse_t - a * (t - self.delta_t / 2 - self.pulse_t / 2.0)
elif t >= (self.delta_t / 2 + self.pulse_t / 2) and t < (self.delta_t - self.pulse_t):
# coasting backward till start of slow down to rest
udot = 0
u = -a * self.pulse_t
elif t >= self.delta_t - self.pulse_t and t <= self.delta_t:
# decelerate to come to a stop
udot = a
u = -a * (self.delta_t - t)
else:
#
udot = 0
u = 0
# print("JJJ", t, u, udot)
return u, udot
def _preDeriv(self, t, _):
u, udot = self._getUdot(t)
delt = 1e-9 * float(t) - self.move_start_t
if self.subhinge_index >= 0:
# regular non-convel subhinge
udotvec = np.zeros(self.driven_subhinge.nU())
udotvec[self.subhinge_index] = udot
self.driven_subhinge.setUdot(udotvec)
else:
# driving a convel constraint
self.driven_subhinge.setInput(u, udot)
def _setupProp(self) -> StatePropagator:
from Karana.Models import (
ProjectConstraintError,
UpdateProxyScene,
SyncRealTime,
TimeDisplay,
)
sp = StatePropagator(
self.multibody, IntegratorType.EULER, None, None, SpSolverType.KINEMATICS
)
# sp.setKinematicsMode(True)
sp.fns.pre_deriv_fns["set_udot"] = self._preDeriv
t_init = np.timedelta64(0, "ns")
# sp.setState(t_init, self.x_init)
sp.setTime(t_init)
self.x_init = sp.assembleState()
sp.setState(self.x_init)
if scene := self.multibody.getScene():
# Add model to update the scene
model = UpdateProxyScene("update_scene", sp, scene)
# Add model to limit speed
model = SyncRealTime("sync_time", sp)
# Add model to show sim time
model = TimeDisplay("sim_time", sp, scene.graphics())
# Add model to fix constraint violation errors
# FIXME: multibody.cks() fails if there are no constraints
try:
cks = self.multibody.cks()
except ValueError:
pass
else:
model = ProjectConstraintError(
"sd", sp, self.multibody, sp.getIntegrator(), self.multibody.cks()
)
model.params.tol = 1e-6
return sp
[docs]
class SwingManager:
"""Manager for swing motions."""
def __init__(self, multibody: Multibody, notify: Notifier):
"""Create an instance of SwingManager.
Parameters
----------
multibody : Multibody
The multibody to manage swings for.
notify : Notifier
The notifier to send messages to.
"""
self._multibody = multibody
self._kin_motion = KinMotion(multibody)
self._notify = notify
def _swingSimpleIk(self, hinge, amplitude, coord_offset=0):
# get the subhinge and subhinge coordinate offset for the hinge coord_offset
sh, subhinge_offset = hinge.coordData().coordAt(coord_offset)
print(f"DOING {sh.name()}/{subhinge_offset}")
self._notify.info(
f"Articulating {sh.name()} subhinge offset={subhinge_offset}. A={amplitude}"
)
self._multibody.articulateSubhinge(
sh,
subhinge_index=subhinge_offset,
rangeQ=amplitude,
dQ=0.04,
pause=0.02,
disable_ik=False,
)
def _swingNoIk(self, hinge, amplitude, coord_offset=0):
# get the subhinge and subhinge coordinate offset for the hinge coord_offset
sh, subhinge_offset = hinge.coordData().coordAt(coord_offset)
print(f"DOING {sh.name()}/{subhinge_offset}")
self._notify.info(
f"Articulating {sh.name()} subhinge offset={subhinge_offset} (no IK). A={amplitude}"
)
self._multibody.articulateSubhinge(
sh,
subhinge_index=subhinge_offset,
rangeQ=amplitude,
dQ=0.04,
pause=0,
disable_ik=True,
)
def _swingUdotPulse(self, hinge, amplitude, step_size, delta_t, render, coord_offset=0):
try:
# get the subhinge and subhinge coordinate offset for the hinge coord_offset
sh, subhinge_offset = hinge.coordData().coordAt(coord_offset)
# print(f"DOING {sh.name()}/{subhinge_offset}")
self._notify.info(
f"Articulating {sh.name()} subhinge offset={subhinge_offset} (udot). A={amplitude}"
)
self._kin_motion.move(
sh,
amplitude=amplitude,
step_size=step_size,
delta_t=delta_t,
render=render,
subhinge_index=subhinge_offset,
)
except ValueError as err:
self._notify.error(f"Motion failed: {err}")
[docs]
def swingHinge(
self,
hinge: PhysicalHinge,
mode: SwingModeOptions,
amplitude: float,
step_size: float,
delta_t: float,
render: bool,
coord_offset: int,
):
"""Swing a hinge.
Parameters
----------
hinge : PhysicalHinge
The hinge to swing.
mode : SwingModeOptions
The mode used for the swing.
amplitude : float
The amplitude of the swing.
step_size : float
The step size for the swing.
delta_t : float
The time taken to complete the swing.
render : bool
Whether to render the swing or not.
coord_offset : int
The coordinate offset of the value changing during the swing.
"""
if coord_offset >= hinge.coordData().nU():
self._notify.error(
f"The requested hinge coordinate {coord_offset} must be less than {hinge.coordData().nU()} - the number of hinge coordinates."
)
return
if mode == "simple_ik":
self._swingSimpleIk(hinge, amplitude=amplitude, coord_offset=coord_offset)
elif mode == "no_ik":
self._swingNoIk(hinge, amplitude=amplitude, coord_offset=coord_offset)
elif mode == "udot_pulse":
self._swingUdotPulse(
hinge,
amplitude=amplitude,
delta_t=delta_t,
coord_offset=coord_offset,
render=render,
step_size=step_size,
)
else:
raise NotImplementedError(f"{mode=}")
[docs]
def swingConvel(
self,
lc: LoopConstraintConVel,
amplitude: float,
delta_t: float,
render: bool,
step_size: float,
):
"""Swing a convel loop constraint."""
try:
self._notify.info(f"Driving {lc.name()} convel constraint (udot). A={amplitude}")
sh = None
self._kin_motion.move(
lc,
amplitude=amplitude,
step_size=step_size,
delta_t=delta_t,
render=render,
subhinge_index=-1,
)
except ValueError as err:
self._notify.error(f"Motion failed: {err}")