# Copyright (c) 2024-2025 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Classes and functions used to convert a NASTRAN model into a flexible body model.
The flexible body model can then be transformed into a PhysicalModalBody with a single line.
This is the easiest and recommended way to bring NASTRAN models into Karana Dynamics simulations.
"""
import numpy as np
from Karana.KUtils.DataStruct import DataStruct
from Karana.KUtils.Ktyping import NonNegativeInt, NonNegativeFloat
from Karana.Core import warn
from typing import Optional, Annotated
from pydantic import model_validator, Field, FilePath
from pathlib import Path
from pyNastran.bdf.bdf import BDF
import re
import os
import h5py
from subprocess import run
from pyNastran.op2.op2 import OP2
from copy import deepcopy
from textwrap import wrap
[docs]
class ModalAnalysisDS(DataStruct):
"""Parameters used for modal analysis.
Parameters
----------
input_file : FilePath
Input NASTRAN run deck or bulk data file.
analysis_dir: Path
Analysis directory. If a path is not is not specified, will use analysis_INPUT_FILE_NAME.
output_file : Path
Output HDF5 file.
grdpnt : Annotated[int, Field(ge=0)]
Reference point. If 0, then use the origin of the basic coordinate system.
n_modes: Optional[Annotated[int, Field(ge=1)]]
Number of modes to keep. The Nastran QRG specifies how this parameter works with
the frequency lower bound and upper bound.
rigid_mode_cutoff: NonNegativeFloat
The rigid mode cuttoff to use in radians.
freq_lb: Optional[NonNegativeFloat]
Lower bound for modes.
freq_ub: Optional[NonNegativeFloat]
Upper bound for modes.
aset: dict[NonNegativeInt, int]
Nodes to include in the aset.
rvdof: dict[NonNegativeInt, int]
Nodes to include in the aset.
output_cs: Optional[int]
Number of the output coordinate system. This will be assigned to the CD field
of the GRDSET card.
stress_recovery_op2: bool
If True, then add commands to produce an op2 file that can be used to recover stress. Note, that at the time this is written, pyNastran cannot read these op2 files. Hence, for stress recovery, one must run the analysis twice. Once in stres recovery mode and once in regular mode.
"""
input_file: FilePath
analysis_dir: Path = Path("DEFAULT")
output_file: Path
grdpnt: NonNegativeInt = 0
n_modes: Optional[Annotated[int, Field(ge=1)]] = None
rigid_mode_cutoff: NonNegativeFloat = 1e-3
freq_lb: Optional[NonNegativeFloat] = None
freq_ub: Optional[NonNegativeFloat] = None
aset: dict[NonNegativeInt, int] = {}
rvdof: dict[NonNegativeInt, int] = {}
output_cs: Optional[int] = None
stress_recovery_op2: bool = False
[docs]
@model_validator(mode="after")
def postValidate(self):
"""Ensure the upper frequency bound is >= the lower frequency bound."""
if self.freq_ub is not None and self.freq_lb is not None:
if self.freq_ub <= self.freq_lb:
raise ValueError(
f"The specified frequency upper bound, {self.freq_ub}, is <= the frequency lower bound {self.freq_lb}. The upper bound must be greater than the lower bound."
)
# Set analysis directory if not specified.
if self.analysis_dir == Path("DEFAULT"):
name = self.input_file.stem
self.analysis_dir = Path("analysis_" + name)
for k, v in self.aset.items():
match = re.match("^[0123456]+$", str(v))
if match is None:
raise ValueError(
f"Got '{v}' for node '{k}' in the aset. Can only include DoFs 1-6."
)
for k, v in self.rvdof.items():
match = re.match("^[0123456]+$", str(v))
if match is None:
raise ValueError(
f"Got '{v}' for node '{k}' in the aset. Can only include DoFs 1-6."
)
if len(self.rvdof) > 0:
# Add aset to rvdof if defined.
for k in self.aset.keys():
self.rvdof[k] = self.aset[k]
return self
@property
def output_bdf(self) -> Path:
"""Retrieve the transformed bulk data filepath as a Path."""
# At this point we are gauranteed to have a non-None path
return self.analysis_dir.joinpath(self.input_file.name)
@property
def output_f06(self) -> str:
"""Retrieve the output F06 file as a Path."""
return self.input_file.stem + ".f06"
@property
def output_op2(self) -> Path:
"""Retrieve the output OP2 file as a string."""
return self.analysis_dir.joinpath("fem_bridge.op2")
@property
def n_modes_minus_aset(self) -> int | None:
"""The number of modes the user requests will be augmented aset modes.
We use this to fix that issue by subracting off, so the user gets the number of
modes that they want.
"""
# Return none if n_modes is None
if self.n_modes is None:
return None
extra_modes = 0
for v in self.aset.values():
extra_modes += len(str(v))
# Remove 6, as we will have typically have 6 rigid-body modes
if extra_modes >= 6:
extra_modes -= 6
return self.n_modes - extra_modes
@property
def n_modes_w_rv(self) -> int | None:
"""Retrive the number of modes + the number of residual vectors."""
# Return none if n_modes is None
if self.n_modes is None:
return None
# The number of residual vectors is the union of aset and rvdof.
if self.rvdof:
combined_dofs = deepcopy(self.aset)
for k, v in self.rvdof.items():
if k in combined_dofs:
# We have the same key in both sets. Need to get the DoFs of each and take their
# union
dofs = set()
for digit in str(v):
dofs.add(int(digit))
for digit in str(combined_dofs[k]):
dofs.add(int(digit))
sorted_dofs = sorted(list(dofs))
combined_dofs[k] = int("".join(map(str, sorted_dofs)))
else:
combined_dofs[k] = v
extra_modes = 0
for v in combined_dofs.values():
extra_modes += len(str(v))
return (
self.n_modes_minus_aset + extra_modes
) # pyright: ignore - will be non-None if n_modes is non-None
else:
return self.n_modes_minus_aset
[docs]
class StressRecoveryDS(DataStruct):
"""Parameters used for stress recovery.
Parameters
----------
input_bdf_file : Path
The modified BDF produced by NastranModalAnalysis when running with stress_recovery_op2 enabled.
input_mdf_file : Path
The input modal deformation file. TODO: Describe how to produce this.
input_op2_file : Path
The op2 file produced by NastranModalAnalysis when running with stress_recovery_op2 enabled.
output_op2_file : Path
The name of the output op2 file. This is the file that will contain the recovered stress data.
analysis_dir : Path
Analysis directory. If a path is not is not specified, will use analysis_INPUT_FILE_NAME.
"""
input_bdf_file: Path
input_mdf_file: Path
input_op2_file: Path
output_op2_file: Path
analysis_dir: Path = Path("DEFAULT")
[docs]
@model_validator(mode="after")
def postValidate(self):
"""Set the analysis directory if not specified."""
if self.analysis_dir == Path("DEFAULT"):
name = self.input_bdf_file.stem
self.analysis_dir = Path("analysis_" + name)
return self
@property
def output_bdf(self) -> Path:
"""Retrieve the transformed bulk data filepath as a Path."""
# At this point we are gauranteed to have a non-None path
return self.analysis_dir.joinpath(self.input_bdf_file.name)
@property
def output_f06(self) -> str:
"""Retrieve the output F06 file as a Path."""
return self.input_bdf_file.stem + ".f06"
@property
def input_mdf_file_rel(self) -> Path:
"""Input MDF file relative to the analysis_dir."""
return self.input_mdf_file.absolute().relative_to(
self.analysis_dir.absolute(), walk_up=True
)
@property
def input_op2_file_rel(self) -> Path:
"""Input MDF file relative to the analysis_dir."""
return self.input_op2_file.absolute().relative_to(
self.analysis_dir.absolute(), walk_up=True
)
from pyNastran.bdf.cards.bdf_sets import ABCQSet
[docs]
class RVDOF(ABCQSet):
"""
Defines degrees-of-freedom in the residual vector set.
+------+-----+----+-----+------+-----+----+-----+----+
| 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
+======+=====+====+=====+======+=====+====+=====+====+
| RVDOF | ID1 | C1 | ID2 | C2 | ID3 | C3 | ID4 | C4 |
+------+-----+----+-----+------+-----+----+-----+----+
| RVDOF | 16 | 2 | 23 | 3516 | 1 | 4 | | |
+------+-----+----+-----+------+-----+----+-----+----+
"""
type = "RVDOF"
_properties = ["node_ids"]
@classmethod
def _init_from_empty(cls):
ids = [1, 2]
components = ["123", "456"]
return RVDOF(ids, components, comment="")
def __init__(self, ids, components: list[str], comment=""):
"""Create an RVDOF card, which defines the degrees of freedom that will be used used for calculating residual vectors.
Parameters
----------
ids : list[int]
the GRID/SPOINT ids
components : list[str]
the degree of freedoms to be retained (e.g., '1', '123')
comment : str; default=''
a comment for the card
..note :: the length of components and ids must be the same
"""
ABCQSet.__init__(self, ids, components, comment)
[docs]
class NastranModalAnalysis:
"""Convert NASTRAN models into flexible body models.
This class processes a NASTRAN model in the following way:
* Using user-defined parameters, modify the NASTRAN run deck to retrieve the appropriate
mode shapes. These mode shapes are component mode shapes obtained using the
Craig Bampton method.
* Extra parameters such as residual vectors or modal integrals can also be obtained
by setting the appropriate parameters.
* Run NASTRAN on the modified run deck and save the results into an HDF5 file. This file
contains all of the critical data needed to create a PhysicalModalBody.
"""
def __init__(self, config: ModalAnalysisDS):
"""Create an instance of the NASTRAN class.
Parameters
----------
config : ModalAnalysisDS
The configuration used to modify the original run deck and run NASTRAN.
"""
self._c = config
self.nastran_executable = "nastran"
[docs]
def writeNewFile(self):
"""Modify the original run deck using the user-provided configuration."""
self._bdf = BDF(mode="nx")
self._bdf.read_bdf(self._c.input_file)
# Make sure CEND is capitalized, as pyNastran doesn't correctly handle cend.
if self._bdf.executive_control_lines[-1].lower() == "cend":
self._bdf.executive_control_lines[-1] = "CEND"
subcases = self._bdf.subcases
if len(subcases) > 1:
warn("More than one subcase detected. Using the first one.")
# Clear out spcs. We don't want SPC constraints, we want to use an aset.
self._bdf.spcs.clear()
self._bdf.spcadds.clear()
# Add params needed to print stuff out to the OP2 file
subcases[0].params["DISP"] = ["ALL", [], "STRESS-type"]
# Add grdpnt. Replace the old one if it exists.
self._bdf.add_param("GRDPNT", [self._c.grdpnt])
self._bdf.add_param("POST", [-1])
# Add an EIGRL card and METHOD. This is for normal modes.
eigrl_number_normal = 1
self._bdf.add_eigrl(
sid=eigrl_number_normal, # Set ID
v1=-5.0e16,
v2=5.0e16,
)
subcases[0].add_integer_type("METHOD", eigrl_number_normal)
# Add an eigrl card and RSMETHOD. This does component mode synthesis.
# TODO: This is for NX only, need to figure out what to do for MSC.
eigrl_number_cms = 100
self._bdf.add_eigrl(
sid=eigrl_number_cms, # Set ID
v1=self._c.freq_lb, # Lower frequency bound
v2=self._c.freq_ub, # Upper frequency bound
nd=self._c.n_modes_minus_aset, # Number of modes to extract
)
subcases[0].add_integer_type("RSMETHOD", eigrl_number_cms)
# Add aset nodes
if self._c.aset:
self._bdf.add_aset(list(self._c.aset.keys()), [str(x) for x in self._c.aset.values()])
# Remove any RESVEC calls in subcases
keys = []
for param in subcases[0].params:
if param.upper() == "RESVEC":
keys.append(param)
for key in keys:
subcases[0].params.pop(key)
# Add rvdof if defined
if self._c.rvdof:
rvdof = RVDOF(list(self._c.rvdof.keys()), [str(x) for x in self._c.rvdof.values()])
self._bdf.add_card([str(x) for x in rvdof.repr_fields()], "RVDOF")
subcases[0].params["RESVEC"] = ["YES", [], "STRESS-type"]
else:
subcases[0].params["RESVEC"] = ["NO", [], "STRESS-type"]
# Add QSET and SPOINT
# If residual vectors are defined, we need to increase the number of qset entries to accomodate them.
# This should be 6 * number of aset dofs + number of rv dofs. There may be intersection, so we need to
# get the union. This is done in ModalAnalysis n_modes_w_rv
if self._c.n_modes is None:
print(
"WARNING: No mode number given. Will create 500 QSET entries. If you need more, please specify n_modes."
)
modes_to_add = 500
else:
modes_to_add: int = (
self._c.n_modes_w_rv
) # pyright: ignore - Will be int if n_modes is not None
# Clear the existing qset and spoint cards
self._bdf.spoints.clear()
self._bdf.qsets.clear()
# We need to get a valid start point for SPOINT
start = max(self._bdf.point_ids) + 1
spoint_ids = [x for x in range(start, modes_to_add + start)]
self._bdf.add_spoint(spoint_ids)
self._bdf.add_qset(spoint_ids, "0")
# Add output coordinate system if defined
if self._c.output_cs is not None:
self._bdf.add_grdset(cp=0, cd=self._c.output_cs, ps="", seid=0)
# If we are running in stress recovery mode, add info to create op2 file
if self._c.stress_recovery_op2:
class MBDExportParam:
def write(self, _):
return "MBDEXPORT RECURDYN STANDARD FLEXBODY=YES RECVROP2=YES\n"
subcases[0].params["MBDEXPORT"] = [MBDExportParam(), [], "OBJ-type"]
fields = {"mass": "KG", "length": "M", "force": "N", "time": "S", "temp_stress": ""}
self._bdf.add_dti("UNITS", fields)
# Add executive control line to output HDF5
if self._c.stress_recovery_op2:
self._bdf.system_command_lines.append(
"ASSIGN OUTPUT2='fem_bridge.op2' UNIT=20 FORM=UNFORM"
)
else:
self._bdf.system_command_lines.append("ASSIGN OP2='fem_bridge.op2'")
os.makedirs(self._c.analysis_dir, exist_ok=True)
self._bdf.write_bdf(str(self._c.output_bdf))
[docs]
def runNastran(self):
"""Run NASTRAN on the output file. Must be run after writeNewFile."""
if not self._c.output_bdf.exists():
raise ValueError(
f"The file {self._c.output_bdf} does not exist. Did you already run writeNewFile?"
)
nas_run = run(
[self.nastran_executable, "batch=no", str(self._c.input_file.name)],
cwd=self._c.analysis_dir,
)
nas_run.check_returncode()
fatal_check = run(["grep", "FATAL", self._c.output_f06], cwd=self._c.analysis_dir)
if fatal_check.returncode != 1:
raise ValueError("FATAL NASTRAN error occured.")
[docs]
def processOP2(self):
"""Process the OP2 file and save the results to the H5 file."""
# Get BDF if it is not already open
if not hasattr(self, "_bdf"):
self._bdf = BDF(mode="nx")
self._bdf.read_bdf(self._c.input_file)
self._op2 = OP2()
self._op2.mode = "nx"
self._op2.read_op2(str(self._c.output_op2))
# Get the stiffness
dark = self._op2.eigenvalues
eig_vals = dark[list(dark.keys())[0]]
freq_rad = eig_vals.radians
stiffness = eig_vals.generalized_stiffness
n_rigid = np.argmax(freq_rad > self._c.rigid_mode_cutoff)
# Get the eigenvectors
dark = self._op2.eigenvectors
vecs = dark[list(dark.keys())[0]].data
# Get the rigid-body mass matrix
dark = self._op2.grid_point_weight
gpw = dark[list(dark.keys())[0]]
cg = gpw.cg # CG info
mass = gpw.mass[0]
b2cm = np.array([cg[2, 0], cg[2, 1], cg[1, 2]])
inertia = gpw.MO[-3:, -3:]
# Write the H5 file
with h5py.File(self._c.output_file, "w") as f:
# Stiffness vector (frequencies)
g = f.create_group("modal_analysis")
g.create_dataset("n_rigid_modes", data=n_rigid)
g.create_dataset("freq_rad", data=np.array(freq_rad))
g.create_dataset("stiffness", data=np.array(stiffness))
# Rigid body mass properties
g = f.create_group("rigid_mass_props")
g.create_dataset("mass", data=mass)
g.create_dataset("center_of_mass", data=b2cm)
g.create_dataset("inertia", data=inertia)
grids = f.create_group("grids")
count = 0
for name, grid in self._bdf.nodes.items():
# This comes out as a 6xn_modes matrix with translation first
nm = vecs[:, count, :].T
g = grids.create_group(str(name))
g.create_dataset("pos", data=grid.xyz)
g.create_dataset("nodal_matrix", data=np.vstack([nm[3:, :], nm[0:3, :]]))
count += 1
[docs]
class NastranStressRecovery:
"""Recover stress from a kdflex sim.
This class uses files produced by NastranModalAnalysis plus a modal deformation file
from a kdflex simulation, and combines them to recover the stresses on a body during
said simulation.
"""
def __init__(self, config: StressRecoveryDS):
"""Consructor.
Parameters
----------
config : StressRecoveryDS
The configuration used to recover stress.
"""
self._c = config
self.nastran_executable = "nastran"
[docs]
def writeNewFile(self):
"""Modify the given run deck using the user-provided configuration."""
self._bdf = BDF(mode="nx")
self._bdf.read_bdf(self._c.input_bdf_file)
subcase = self._bdf.subcases[0]
# Add params needed to print stuff out to the OP2 file
subcase.params["DISPLACEMENT"] = ["ALL", [], "STRESS-type"]
subcase.params["STRESS"] = ["ALL", [], "STRESS-type"]
subcase.params["FORCE"] = ["ALL", [], "STRESS-type"]
subcase.params["STRAIN"] = ["ALL", [], "STRESS-type"]
# Remove param related to MBDEXPORT
pop_keys = []
for k, v in subcase.params.items():
if isinstance(v, list) and isinstance(v[0], str) and "MBDEXPORT" in v[0].upper():
pop_keys.append(k)
for k in pop_keys:
subcase.params.pop(k)
# Add the ASCII param to read in the MDF
class MBDRecvrParam:
def write(self, _):
return "MBDRECVR ASCII\n"
subcase.params["MBDRECVR"] = [MBDRecvrParam(), [], "OBJ-type"]
# Remove RSMETHOD if it exists
eigrl_number_cms = 100
if subcase.has_parameter("RSMETHOD")[0]:
subcase.params.pop("RSMETHOD")
self._bdf._add_methods.model.methods.pop(eigrl_number_cms)
# Move all include files into the same directory
self._bdf.add_include_file(str(self._c.input_mdf_file_rel))
# Add INPUTT2 file
line = (
",\n".join(wrap(f"ASSIGN INPUTT2='{self._c.input_op2_file_rel}' ,", 71))
+ "\nUNIT=20 FORM=UNFORM"
)
self._bdf.system_command_lines.append(line)
# Remove the OUTPUT2 line
indices = []
for k, line in enumerate(self._bdf.system_command_lines):
if "OUTPUT2" in line.upper():
indices.append(k)
indices.reverse()
for idx in indices:
self._bdf.system_command_lines.pop(idx)
os.makedirs(self._c.analysis_dir, exist_ok=True)
self._bdf.write_bdf(str(self._c.output_bdf))
[docs]
def runNastran(self):
"""Run NASTRAN on the output file. Must be run after writeNewFile."""
if not self._c.output_bdf.exists():
raise ValueError(
f"The file {self._c.output_bdf} does not exist. Did you already run writeNewFile?"
)
nas_run = run(
[self.nastran_executable, "batch=no", str(self._c.input_bdf_file.name)],
cwd=self._c.analysis_dir,
)
nas_run.check_returncode()
fatal_check = run(["grep", "FATAL", self._c.output_f06], cwd=self._c.analysis_dir)
if fatal_check.returncode != 1:
raise ValueError("FATAL NASTRAN error occured.")
[docs]
def writeMdfFile(ds: h5py.Dataset, output_mdf_file: Path | str) -> None:
"""Write an MDF file given flex body simulation data.
Parameters
----------
ds : h5py.Group
The group that contains the flexible body information.
output_mdf_file : Path | str
The file to write the MDF data to.
"""
# TODO: Do we need to add in rigid body modes here to capture inertial forces?
# Get data from the H5 file
t = ds["t"][:]
q = ds["Q"][:].T
u = ds["U"][:].T
udot = ds["Udot"][:].T
# Create a model to hold the data
model = BDF(mode="nx")
# Add the DTI entry for time
model.add_dti(
name="TOL",
fields={0: [t.size, 1, 2, 1, 1, 10000] + t.flatten().tolist() + ["ENDREC"]},
comment="time",
)
# Put data into the appropriate matrix. This is n_modes x n_time * 3. This stacks
# the modal coordinates, velocities, and accelerations to form the appropriate UH%
# matrix.
uht = np.transpose(np.stack((q, u, udot), axis=0), (1, 2, 0)).reshape(q.shape[0], -1)
uht = np.vstack([np.zeros((6, uht.shape[1])), uht])
J, I = np.indices((uht.shape[1], uht.shape[0]))
J += 1
I += 1
model.add_dmi(
"UHT", 2, 1, 0, uht.shape[0], uht.shape[1], J.flatten(), I.flatten(), uht.T.flatten()
)
# Add the BHT matrix. This is just a matrix of zeros of the appropriate size
zeros = np.zeros((6, uht.shape[1]))
J, I = np.indices((zeros.shape[1], zeros.shape[0]))
J += 1
I += 1
model.add_dmi("BHT", 2, 1, 0, 6, uht.shape[1], J.flatten(), I.flatten(), zeros.T.flatten())
# Write out the model the user-specified file.
model.write_bdf(str(output_mdf_file))