from dataclasses import dataclass, replace
from typing import Literal
import math
import numpy as np
from Karana.Dynamics import (
Multibody,
StatePropagator,
PhysicalSubhinge,
PhysicalHinge,
SPSOLVER_TYPE,
TimedEvent,
LoopConstraintConVel,
)
from Karana.Math import IntegratorType
from .dialog import MenuEntry, MenuTUI, ChoiceTUI, ChoiceData, EntryDialog
from .notify import Notifier
from . import terminal as term
SwingModeOptions = Literal["simple_ik", "no_ik", "udot_pulse"]
[docs]
@dataclass
class SwingConfig:
mode: SwingModeOptions = "udot_pulse"
amplitude: float = 0.4
step_size: float = 0.1
deltaT: float = 6.0
render: bool = False
[docs]
class SwingModeTUI(ChoiceTUI):
def __init__(self, mode: SwingModeOptions):
choices = [
ChoiceData(
brief="Kinematic pulse (udot_pulse)",
description="Do a kinematic sim, pulsing acceleration to drive the hinge",
value="udot_pulse",
),
ChoiceData(
brief="Simple (simple_ik)",
description="Continually set the hinge coordinates and do IK to solve constraints",
value="simple_ik",
),
ChoiceData(
brief="Articulate without IK (no_ik)",
description="Continually set the hinge coordinates without doing IK",
value="no_ik",
),
]
# Start with the current mode selected
index = 0
for i, choice in enumerate(choices):
if mode == choice.value:
index = i
super().__init__(
header=[
"Select swing mode",
" enter: confirm",
" q / Ctrl-C: cancel",
" ↓ / j: move down",
" ↑ / k: move up",
" 1-4: jump to choice",
],
choices=choices,
index=index,
)
[docs]
class AmplitudeDialog(EntryDialog):
def __init__(self, amplitude):
entry = str(amplitude)
super().__init__(prompt=["Enter the new swing amplitude (Ctrl-C to abort):"], entry=entry)
[docs]
def is_valid(self) -> bool:
try:
value = float(self.entry)
except ValueError:
return False
if not math.isfinite(value):
return False
return True
[docs]
class StepSizeDialog(EntryDialog):
def __init__(self, step_size):
entry = str(step_size)
super().__init__(
prompt=["Enter the new swing step size in seconds (Ctrl-C to abort):"], entry=entry
)
[docs]
def is_valid(self) -> bool:
try:
value = float(self.entry)
except ValueError:
return False
if value <= 0:
return False
if not math.isfinite(value):
return False
return True
[docs]
class DeltaTDialog(EntryDialog):
def __init__(self, deltaT):
entry = str(deltaT)
super().__init__(prompt=["Enter the new swing deltaT (Ctrl-C to abort):"], entry=entry)
[docs]
def is_valid(self) -> bool:
try:
value = float(self.entry)
except ValueError:
return False
if value <= 0:
return False
if not math.isfinite(value):
return False
return True
[docs]
class KinMotion:
def __init__(self, multibody: Multibody):
self.multibody = multibody
self.x_init = self.multibody.dynamicsToState()
self.deltaT = 6.0
self.pulseT = self.deltaT / 8.0
# the driven_subhinge is normally a subhinge, but a loop
# constraint for convels
self.driven_subhinge = None
# a negative subhinge index implies that we are driving a convel
# loop constraint
self.subhinge_index = 0
self.move_start_t = None
self.A = None
# flag to control whether to render images for a movie
self.render = False
self.gsm = None
# FIXME: this state_propagator and the registered models
# are preventing a proper cleanup
self.state_propagator = self._setup_prop()
# Add model to update plotting
self.dash_app = self.getDashApp()
from Karana.Models import DataPlotter
dp = DataPlotter("dataplotter", self.state_propagator)
dp.params.dash_app = self.dash_app
h = 0.1
dp.setPeriod(h)
def _ensureRendering(self, render: bool):
# Do nothing if the gsm model wasn't created (eg: no scene)
if not self.gsm:
if scene := self.multibody.getScene():
# First time, create model and register
from Karana.Models import GraphicalSceneMovie
self.gsm = GraphicalSceneMovie("swing", self.state_propagator, scene)
self.gsm.params.dir = "movie"
self.gsm.params.filename_prefix = "image"
self.gsm.setPeriod(1.0 / 60.0)
self.render = True
# Early return, since we don't want any other logic the first time
# or if the gsm doesn't exist
return
# Handle edge going from False to True - register gsm
if render and not self.render:
self.render = True
self.state_propagator.registerModel(self.gsm)
# Handle edge going from True to False - unregister gsm
elif not render and self.render:
self.render = False
self.state_propagator.unregisterModel(self.gsm)
[docs]
def getDashApp(self):
def time():
return 1e-9 * float(self.state_propagator.getTime())
def xx():
print("In xx", time())
return time() * 1.1
from Karana.KUtils.DataPlotter import PlotDataDS, DashApp
multibody = self.multibody
"""
shg = multibody.getBody("part_TR_Steering.ges_steering_wheel_90").parentHinge().subhinge(0)
pdstwheel = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Steering.ges_rack_89").parentHinge().subhinge(0)
pdrack = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0)
pdwheel = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = (
multibody.getBody("part_TR_Front_Suspension.gel_lower_control_arm2_18")
.parentHinge()
.subhinge(0)
)
pdwheel2 = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Front_Suspension.gel_upright_26").parentHinge().subhinge(0)
pdwheel1 = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Rear_Suspension.ger_spindle_80").parentHinge().subhinge(0)
pdspindle = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = (
multibody.getBody("part_TR_Rear_Suspension.ger_drive_shaft_82")
.parentHinge()
.subhinge(0)
)
pdshaft = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
shg = multibody.getBody("part_TR_Powertrain.ger_diff_output_122").parentHinge().subhinge(0)
pddiff = PlotDataDS(
x_data_name="t",
x_data=time,
y_data={"q": shg.getQ, "u": shg.getU, "udot": shg.getUdot},
title=shg.name(),
)
dashapp = DashApp(data=[pdstwheel, pdrack, pdwheel, pddiff, pdshaft, pdspindle])
"""
dashapp = DashApp(data=[])
return dashapp
[docs]
def move(
self,
subhinge: PhysicalSubhinge,
amplitude: float,
deltaT: float,
render: bool,
subhinge_index: int,
step_size: float,
):
cks = self.multibody.cks()
worked = False
h = np.timedelta64(int(1e9 * step_size), "ns")
te = TimedEvent("hop_size", h + self.state_propagator.getTime(), lambda _: None, False)
te.period = h
try:
self.state_propagator.registerTimedEvent(te)
self.driven_subhinge = subhinge
self.subhinge_index = subhinge_index
self.move_start_t = 1e-9 * float(self.state_propagator.getTime())
self.A = amplitude
self.deltaT = deltaT
self.pulseT = self.deltaT / 8.0
self._ensureRendering(render)
# prior = self.multibody.subhingeCoordData().frozenCoordsForKin()
if subhinge_index >= 0:
cks.ensureCurrent()
cks.freezeCoord(subhinge, subhinge_index)
self.state_propagator.advanceBy(self.deltaT + self.pulseT)
if subhinge_index >= 0:
cks.unfreezeCoord(subhinge, subhinge_index)
worked = True
finally:
tnew = self.state_propagator.getTime()
# self.state_propagator.setState(tnew, self.x_init)
self.state_propagator.setTime(tnew)
self.state_propagator.setState(self.x_init)
self.state_propagator.unregisterTimedEvent(te.name(), te.isBeforeHop())
self.multibody.stateToDynamics(self.x_init)
# in case there was an exception, unfreeze the subhinge coordinate
if not worked and subhinge_index >= 0:
cks.unfreezeCoord(subhinge, subhinge_index)
if scene := self.multibody.getScene():
scene.update()
def _get_udot(self, sp_t):
"""
Return the (u, udot) value for the current time for this profile
"""
t = 1e-9 * float(sp_t) - self.move_start_t
a = self.A * 0.2
if t >= 0 and t < self.pulseT:
# initial acceleration to go forward
udot = a
u = a * t
elif t >= self.pulseT and t < (self.deltaT / 2 - self.pulseT / 2):
# coasting forward till start of slow down and reversal
udot = 0
u = a * self.pulseT
elif t >= self.deltaT / 2 - self.pulseT / 2.0 and t < (self.deltaT / 2 + self.pulseT / 2):
# ramp down to start going backwards
udot = -2 * a
u = a * self.pulseT - a * (t - self.deltaT / 2 - self.pulseT / 2.0)
elif t >= (self.deltaT / 2 + self.pulseT / 2) and t < (self.deltaT - self.pulseT):
# coasting backward till start of slow down to rest
udot = 0
u = -a * self.pulseT
elif t >= self.deltaT - self.pulseT and t <= self.deltaT:
# decelerate to come to a stop
udot = a
u = -a * (self.deltaT - t)
else:
#
udot = 0
u = 0
# print("JJJ", t, u, udot)
return u, udot
def _pre_deriv(self, t, _):
u, udot = self._get_udot(t)
delt = 1e-9 * float(t) - self.move_start_t
if self.subhinge_index >= 0:
# regular non-convel subhinge
udotvec = np.zeros(self.driven_subhinge.nU())
udotvec[self.subhinge_index] = udot
self.driven_subhinge.setUdot(udotvec)
else:
# driving a convel constraint
self.driven_subhinge.setInput(u, udot)
def _setup_prop(self) -> StatePropagator:
from Karana.Models import (
ProjectConstraintError,
UpdateProxyScene,
SyncRealTime,
TimeDisplay,
)
sp = StatePropagator(
self.multibody, IntegratorType.EULER, None, None, SPSOLVER_TYPE.KINEMATICS
)
# sp.setKinematicsMode(True)
sp.getOptions().fns.pre_deriv_fns["set_udot"] = self._pre_deriv
t_init = np.timedelta64(0, "ns")
# sp.setState(t_init, self.x_init)
sp.setTime(t_init)
sp.setState(self.x_init)
if scene := self.multibody.getScene():
# Add model to update the scene
model = UpdateProxyScene("update_scene", sp, scene)
# Add model to limit speed
model = SyncRealTime("sync_time", sp)
# Add model to show sim time
model = TimeDisplay("sim_time", sp, scene.graphics())
# Add model to fix constraint violation errors
# FIXME: multibody.cks() fails if there are no constraints
try:
cks = self.multibody.cks()
except ValueError:
pass
else:
model = ProjectConstraintError(
"sd", sp, self.multibody, sp.getIntegrator(), self.multibody.cks()
)
model.params.tol = 1e-6
return sp
[docs]
class SwingManager:
def __init__(self, multibody: Multibody, notify: Notifier):
self._multibody = multibody
self._kin_motion = KinMotion(multibody)
self._notify = notify
def _swing_simple_ik(self, hinge, amplitude, coord_offset=0):
# get the subhinge and subhinge coordinate offset for the hinge coord_offset
sh, subhinge_offset = hinge.coordData().coordAt(coord_offset)
print(f"DOING {sh.name()}/{subhinge_offset}")
self._notify.info(
f"Articulating {sh.name()} subhinge offset={subhinge_offset}. A={amplitude}"
)
self._multibody.articulateSubhinge(
sh,
subhinge_index=subhinge_offset,
rangeQ=amplitude,
dQ=0.04,
pause=0.02,
disable_ik=False,
)
def _swing_no_ik(self, hinge, amplitude, coord_offset=0):
# get the subhinge and subhinge coordinate offset for the hinge coord_offset
sh, subhinge_offset = hinge.coordData().coordAt(coord_offset)
print(f"DOING {sh.name()}/{subhinge_offset}")
self._notify.info(
f"Articulating {sh.name()} subhinge offset={subhinge_offset} (no IK). A={amplitude}"
)
self._multibody.articulateSubhinge(
sh,
subhinge_index=subhinge_offset,
rangeQ=amplitude,
dQ=0.04,
pause=0,
disable_ik=True,
)
def _swing_udot_pulse(self, hinge, amplitude, step_size, deltaT, render, coord_offset=0):
try:
# get the subhinge and subhinge coordinate offset for the hinge coord_offset
sh, subhinge_offset = hinge.coordData().coordAt(coord_offset)
# print(f"DOING {sh.name()}/{subhinge_offset}")
self._notify.info(
f"Articulating {sh.name()} subhinge offset={subhinge_offset} (udot). A={amplitude}"
)
self._kin_motion.move(
sh,
amplitude=amplitude,
step_size=step_size,
deltaT=deltaT,
render=render,
subhinge_index=subhinge_offset,
)
except ValueError as err:
self._notify.error(f"Motion failed: {err}")
[docs]
def swingHinge(
self,
hinge: PhysicalHinge,
mode: SwingModeOptions,
amplitude: float,
step_size: float,
deltaT: float,
render: bool,
coord_offset: int,
):
if coord_offset >= hinge.coordData().nU():
self._notify.error(
f"The requested hinge coordinate {coord_offset} must be less than {hinge.coordData().nU()} - the number of hinge coordinates."
)
return
if mode == "simple_ik":
self._swing_simple_ik(hinge, amplitude=amplitude, coord_offset=coord_offset)
elif mode == "no_ik":
self._swing_no_ik(hinge, amplitude=amplitude, coord_offset=coord_offset)
elif mode == "udot_pulse":
self._swing_udot_pulse(
hinge,
amplitude=amplitude,
deltaT=deltaT,
coord_offset=coord_offset,
render=render,
step_size=step_size,
)
else:
raise NotImplementedError(f"{mode=}")
[docs]
def swingConvel(
self,
lc: LoopConstraintConVel,
amplitude: float,
deltaT: float,
render: bool,
step_size: float,
):
"""
Swing a convel loop constraint
"""
try:
self._notify.info(f"Driving {lc.name()} convel constraint (udot). A={amplitude}")
sh = None
self._kin_motion.move(
lc,
amplitude=amplitude,
step_size=step_size,
deltaT=deltaT,
render=render,
subhinge_index=-1,
)
except ValueError as err:
self._notify.error(f"Motion failed: {err}")