Source code for Karana.KUtils.multirun._runner

# Copyright (c) 2024-2025 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.

"""This module provides runners used to execute runs as well as related data classes."""

from abc import abstractmethod, ABC
import asyncio
from pathlib import Path
from dataclasses import dataclass

from ._types import RunStatus, ParamMap

__all__ = ["Run", "AbstractRunner", "RunStatistics", "ParallelLocalRunner"]


[docs] @dataclass class Run: """Handle to a Run task and its metadata. Attributes ---------- task: asyncio.Task | None Awaitable task handle once one has been assigned status: RunStatus Current state of the run - "skipped": The run will not be executed - "pending": The run is waiting to be executed - "running": The run is currently being executed - "success": The run completed without error - "failure": The run completed with an error """ task: asyncio.Task | None = None status: RunStatus = "pending"
[docs] @dataclass class RunStatistics: """Statistical data about the Runs. Attributes ---------- skipped_count: int Number of runs with "skipped" status pending_count: int Number of runs with "pending" status running_count: int Number of runs with "running" status success_count: int Number of runs with "success" status failure_count: int Number of runs with "failure" status complete_count: int Number of runs with any terminal status total_count: int Total number of runs """ skipped_count: int = 0 pending_count: int = 0 running_count: int = 0 success_count: int = 0 failure_count: int = 0 complete_count: int = 0 total_count: int = 0
[docs] class AbstractRunner(ABC): """Interface for runner classes."""
[docs] @abstractmethod def computeStatistics(self) -> RunStatistics: """Compute statistics about the runs. Returns ------- RunStatistics Container for statistics about the runs """
[docs] @abstractmethod def add(self, cmd: list[str], cwd: Path = Path(".")) -> Run: """Add a run. Parameters ---------- cmd: list[str] The command to run cwd: Path The run directory. Defaults to the current directory. Returns ------- Run A handle to the run task and metadata """
[docs] @abstractmethod async def gather(self): """Wait for all runs to complete."""
[docs] class ParallelLocalRunner(AbstractRunner): """Runner implementation that does runs in parallel locally.""" def __init__(self, *, max_parallel: int = 4): """Create a ParallelLocalRunner instance. Parameters ---------- max_parallel: int Maximum number of parallel runs. Defaults to 4. """ self.semaphore = asyncio.Semaphore(value=max_parallel) self.runs = []
[docs] def computeStatistics(self) -> RunStatistics: """Compute statistics about the run. Returns ------- RunStatistics The statistics about the run. """ stats = RunStatistics() for run in self.runs: stats.total_count += 1 if run.status == "skipped": stats.skipped_count += 1 stats.complete_count += 1 if run.status == "pending": stats.pending_count += 1 if run.status == "running": stats.running_count += 1 if run.status == "success": stats.success_count += 1 stats.complete_count += 1 if run.status == "failure": stats.failure_count += 1 stats.complete_count += 1 return stats
[docs] def add(self, cmd: list[str], cwd: Path = Path(".")) -> Run: """Add a run. Parameters ---------- cmd : list[str] The command to run. cwd : Path The working directory to run it in. Returns ------- Run A Run instance with the tasks. """ run = Run() run.task = asyncio.create_task(self._doRun(cmd=cmd, cwd=cwd, run=run)) self.runs.append(run) return run
async def _doRun(self, cmd: list[str], cwd: Path, run: Run): async with self.semaphore: run.status = "running" stdout_path = cwd / "stdout.txt" stderr_path = cwd / "stderr.txt" try: with stdout_path.open("wb") as stdout, stderr_path.open("wb") as stderr: proc = await asyncio.create_subprocess_exec( *cmd, stdout=stdout, stderr=stderr, cwd=cwd ) await proc.wait() except Exception: run.status = "error" raise else: if proc.returncode == 0: run.status = "success" else: run.status = "error"
[docs] async def gather(self): """Gather the results of the runs.""" await asyncio.gather(*[run.task for run in self.runs])