Source code for Karana.KUtils.multirun._runner
# Copyright (c) 2024-2025 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""This module provides runners used to execute runs as well as related data classes."""
from abc import abstractmethod, ABC
import asyncio
from pathlib import Path
from dataclasses import dataclass
from ._types import RunStatus, ParamMap
__all__ = ["Run", "AbstractRunner", "RunStatistics", "ParallelLocalRunner"]
[docs]
@dataclass
class Run:
"""Handle to a Run task and its metadata.
Attributes
----------
task: asyncio.Task | None
Awaitable task handle once one has been assigned
status: RunStatus
Current state of the run
- "skipped": The run will not be executed
- "pending": The run is waiting to be executed
- "running": The run is currently being executed
- "success": The run completed without error
- "failure": The run completed with an error
"""
task: asyncio.Task | None = None
status: RunStatus = "pending"
[docs]
@dataclass
class RunStatistics:
"""Statistical data about the Runs.
Attributes
----------
skipped_count: int
Number of runs with "skipped" status
pending_count: int
Number of runs with "pending" status
running_count: int
Number of runs with "running" status
success_count: int
Number of runs with "success" status
failure_count: int
Number of runs with "failure" status
complete_count: int
Number of runs with any terminal status
total_count: int
Total number of runs
"""
skipped_count: int = 0
pending_count: int = 0
running_count: int = 0
success_count: int = 0
failure_count: int = 0
complete_count: int = 0
total_count: int = 0
[docs]
class AbstractRunner(ABC):
"""Interface for runner classes."""
[docs]
@abstractmethod
def computeStatistics(self) -> RunStatistics:
"""Compute statistics about the runs.
Returns
-------
RunStatistics
Container for statistics about the runs
"""
[docs]
@abstractmethod
def add(self, cmd: list[str], cwd: Path = Path(".")) -> Run:
"""Add a run.
Parameters
----------
cmd: list[str]
The command to run
cwd: Path
The run directory. Defaults to the current directory.
Returns
-------
Run
A handle to the run task and metadata
"""
[docs]
@abstractmethod
async def gather(self):
"""Wait for all runs to complete."""
[docs]
class ParallelLocalRunner(AbstractRunner):
"""Runner implementation that does runs in parallel locally."""
def __init__(self, *, max_parallel: int = 4):
"""Create a ParallelLocalRunner instance.
Parameters
----------
max_parallel: int
Maximum number of parallel runs. Defaults to 4.
"""
self.semaphore = asyncio.Semaphore(value=max_parallel)
self.runs = []
[docs]
def computeStatistics(self) -> RunStatistics:
"""Compute statistics about the run.
Returns
-------
RunStatistics
The statistics about the run.
"""
stats = RunStatistics()
for run in self.runs:
stats.total_count += 1
if run.status == "skipped":
stats.skipped_count += 1
stats.complete_count += 1
if run.status == "pending":
stats.pending_count += 1
if run.status == "running":
stats.running_count += 1
if run.status == "success":
stats.success_count += 1
stats.complete_count += 1
if run.status == "failure":
stats.failure_count += 1
stats.complete_count += 1
return stats
[docs]
def add(self, cmd: list[str], cwd: Path = Path(".")) -> Run:
"""Add a run.
Parameters
----------
cmd : list[str]
The command to run.
cwd : Path
The working directory to run it in.
Returns
-------
Run
A Run instance with the tasks.
"""
run = Run()
run.task = asyncio.create_task(self._doRun(cmd=cmd, cwd=cwd, run=run))
self.runs.append(run)
return run
async def _doRun(self, cmd: list[str], cwd: Path, run: Run):
async with self.semaphore:
run.status = "running"
stdout_path = cwd / "stdout.txt"
stderr_path = cwd / "stderr.txt"
try:
with stdout_path.open("wb") as stdout, stderr_path.open("wb") as stderr:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=stdout, stderr=stderr, cwd=cwd
)
await proc.wait()
except Exception:
run.status = "error"
raise
else:
if proc.returncode == 0:
run.status = "success"
else:
run.status = "error"
[docs]
async def gather(self):
"""Gather the results of the runs."""
await asyncio.gather(*[run.task for run in self.runs])