Source code for Karana.KUtils.multirun._multirun

# Copyright (c) 2024-2025 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.

"""This module provides the high level function, multirun, to run a command multiple times with different parameters in distinct directories."""

import asyncio
from collections.abc import Iterable
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import shutil
import subprocess
import time
import tomllib
from typing import Any, Literal

from ._types import FileLinkage, ParamValue, ParamMap
from ._runner import AbstractRunner, ParallelLocalRunner
from ._display import displayStatus

__all__ = ["FileResource", "multirun", "multirunAsync"]


[docs] @dataclass class FileResource: """Descriptor for a file to be used in runs. Attributes ---------- path: Path The path to the existing file linkage: multirun.FileLinkage How to make the file available to runs - "copy": copy the file into each run directory - "shared": symlink to a shared copy in a common area - "link": symlink to the given file path rename: str | Path | None Filename or path of the file in the run directory """ path: Path linkage: FileLinkage = "shared" rename: str | Path | None = None @property def name(self) -> Path: """Get the filename in the run directory. Returns ------- Path The filename """ return Path(self.rename or self.path.name)
[docs] async def multirunAsync( *, runs: Iterable[ParamMap], cmd: list[str], output_path: Path | str | None = None, input_files: list[FileResource | Path | str] | None = None, runner: AbstractRunner | None = None, ): """Run a command with varying parameters in distinct directories. Parameters ---------- runs: Iterable[ParamMap] The parameters for each run cmd: list[str] The command to run output_path: Path | str | None A new directory to store the runs in. Defaults to a directory timestamped subdirectory in the current directory. input_files: list[FileResource | Path | str] | None A list of external files to copy/link into each run directory. If given as a Path or str, will be converted to a FileResource with default attributes. runner: AbstractRunner | None A runner used to execute the runs. Defaults to using multirun.ParallelLocalRunner. """ if output_path is None: stamp = datetime.now().strftime("%Y_%m_%d__%H_%M_%S") output_path = f"multirun_{stamp}" output_path = Path(output_path) # Project files to a list of FileResource instances if input_files is None: input_files = [] for i, file_resource in enumerate(input_files): if not isinstance(file_resource, FileResource): input_files[i] = FileResource(path=Path(file_resource)) if runner is None: runner = ParallelLocalRunner() # We require a clean start, so make sure the path doesn't exist output_path.mkdir(exist_ok=False) # Create an area for files that are shared between runs shared_files_path = output_path / "files" shared_files_path.mkdir() # Copy/link files shared between runs for file_resource in input_files: shared_path = shared_files_path / file_resource.name if file_resource.linkage == "link": # Create a symlink to the external file in the shared files area shared_path.symlink_to(file_resource.path.absolute()) if file_resource.linkage == "shared": # Copy the external file into the shared files area # This would be simpler but requires python 3.14: # file_resource.path.copy(shared_path, follow_symlinks=False) # More complicated approach for older python: if file_resource.path.is_dir(): shutil.copytree(file_resource.path, shared_path, symlinks=True) else: shutil.copy(file_resource.path, shared_path) asyncio.create_task(displayStatus(runner)) start_time = time.monotonic() for i, param_map in enumerate(runs): path = output_path / "runs" / f"run_{i:06d}" path.mkdir(parents=True) # Prepare file resource for the run for file_resource in input_files: local_file_path = path / file_resource.name if file_resource.linkage == "copy": # Copy the external file into the shared files area # This would be simpler but requires python 3.14: # file_resource.path.copy(shared_path, follow_symlinks=False) # More complicated approach for older python: if file_resource.path.is_dir(): shutil.copytree(file_resource.path, local_file_path, symlinks=True) else: shutil.copy(file_resource.path, local_file_path) else: # link the file or symlink in the shared file area shared_path = shared_files_path / file_resource.name rel_target = shared_path.relative_to(local_file_path.parent, walk_up=True) local_file_path.symlink_to(rel_target) # Write the param map to a toml file in the run directory param_path = path / "param_map.toml" with param_path.open("w") as f: for param_name, param_value in param_map.items(): if param_value is None: continue print(f"{param_name} = {repr(param_value)}", file=f) runner.add(cmd=cmd, cwd=path) # Yield every 0.01 seconds if time.monotonic() - start_time >= 0.01: # Yield control back to the event loop # Using asyncio.sleep(0) is best here, as we only need to # let other tasks run, not introduce a minimum delay. await asyncio.sleep(0) # Reset the timer for the next chunk start_time = time.monotonic() await runner.gather()
[docs] def multirun( *, runs: Iterable[ParamMap], cmd: list[str], output_path: Path | str | None = None, input_files: list[FileResource | Path | str] | None = None, runner: AbstractRunner | None = None, ): """Run a command with varying parameters in distinct directories. Parameters ---------- runs: Iterable[ParamMap] The parameters for each run cmd: list[str] The command to run output_path: Path | str | None A new directory to store the runs in. Defaults to a directory timestamped subdirectory in the current directory. input_files: list[FileResource | Path | str] | None A list of external files to copy/link into each run directory runner: AbstractRunner | None A runner used to execute the runs. Defaults to using multirun.ParallelLocalRunner. """ asyncio.run( multirunAsync( runs=runs, cmd=cmd, output_path=output_path, input_files=input_files, runner=runner ) )