Source code for Karana.KUtils.FilesystemLikeDatabase

# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.

"""Classes and functions related to Filesystem."""

from fnmatch import fnmatch
from typing import overload
from weakref import ref
from networkx import DiGraph
from collections import deque


[docs] class FilesystemLikeDatabase[T]: """Filesystem-like database for storing objects and looking them up by filepath-like names.""" def __init__(self): """Create an instance of FilesystemLikeDatabase.""" self.graph = DiGraph() self.root = "/" self.graph.add_node(id(self.root), name="/", obj_ref=None) @overload def addObject(self, name: str, obj: T, parent: str, /): """Add an object to the database. Parameters ---------- name : str The name of the object. obj : T The object itself. parent : str The parent as a path in the current database. """ ... @overload def addObject(self, obj: T, parent: str, /): """Add an object to the database. Parameters ---------- obj : T The object itself. The object must implement a name() method that returns a string. parent : str The parent as a path in the current database. """ ... @overload def addObject(self, name: str, obj: T, parent: T, /): """Add an object to the database. Parameters ---------- name : str The name of the object. obj : T The object itself. parent : T The parent object, which must already be in the database. """ ... @overload def addObject(self, obj: T, parent: T, /): """Add an object to the database. Parameters ---------- obj : T The object itself. The object must implement a name() method that returns a string. parent : T The parent object, which must already be in the database. """ ...
[docs] def addObject(self, *args): """Add an object to the database. 1. Using a name, object, and parent as a path. Parameters ---------- name : str The name of the object. obj : T The object itself. parent : str The parent as a path in the current database. 2. Using an object with a name method and the parent as a path. Parameters ---------- obj : T The object itself. The object must implement a name() method that returns a string. parent : str The parent as a path in the current database. 3. Using a name, an object, and the parent as an object. Parameters ---------- name : str The name of the object. obj : T The object itself. parent : T The parent object, which must already be in the database. 4. Using an object with a name method and the parent as an object. Parameters ---------- obj : T The object itself. The object must implement a name() method that returns a string. parent : T The parent object, which must already be in the database. """ if len(args) == 2: obj = args[0] parent = args[1] if isinstance(parent, str): parent_id = self._resolveSingle(args[1]) else: parent_id = self._getNode(args[1]) if not hasattr(obj, "name"): raise ValueError(f"Object {obj} has no attribute name.") name: str = obj.name() self._addObject(name, obj, parent_id) elif len(args) == 3: name = args[0] obj = args[1] parent = args[2] if isinstance(parent, str): parent_id = self._resolveSingle(parent) else: parent_id = self._getNode(parent) self._addObject(name, obj, parent_id)
[docs] def getObjects(self, path: str, parent: T | None = None) -> list[T]: """Get objects in the database. Parameters ---------- path : str The path to use to find objects. parent : T | None The parent to start searching from. If None, then the root is used. Returns ------- list[T] A list of objects found. """ if parent is not None: parent_id = self._getNode(parent) else: parent_id = id(self.root) objs: list[T] = [] for x in self._resolve(path, parent_id): obj = self._getObject(x) if obj is not None: objs.append(obj) return objs
[docs] def getObject(self, path: str, parent: T | None = None) -> T: """Get object in the database. Parameters ---------- path : str The path to use to find the object. parent : T | None The parent to start searching from. If None, then the root is used. Returns ------- T The object that matches the path. """ dark = self.getObjects(path, parent) if len(dark) == 1: return dark[0] elif len(dark) > 1: raise ValueError(f"Found non-unique object matching {path}.") else: raise ValueError(f"Did not find any objects matching {path}.")
def _getObject(self, obj_id: int) -> T | None: """Get the object at the given ID. Parameters ---------- obj_id : int The ID of the object to get. Returns ------- T | None The object. """ obj_ref = self.graph.nodes[obj_id]["obj_ref"] if obj_ref is not None: return obj_ref() else: return None def _getNode(self, obj: T) -> int: """Get the node associated with a given object. Parameters ---------- obj : T The object to find the node for. Returns ------- int The integer of the node associated with that object. """ return id(obj) def _addObject(self, name: str, obj: T, parent: int): """Add an object to the database. Parameters ---------- name : str The name of the object ot add. obj : T The object itself. parent : int The ID of the parent node to attach to. """ node_id = id(obj) self.graph.add_node( node_id, name=name, obj_ref=ref(obj), ) self.graph.add_edge(parent, node_id) def _resolve(self, path: str, parent: int | None = None) -> list[int]: """Find all the nodes at a given path. Parameters ---------- path : str The path to use to find node(s). parent : int | None The parent to start the path traversal from. Returns ------- list[int] A list of all nodes at the given path. """ if parent is None: if path[0] != "/": raise ValueError("Parent is None, but path is not absolute.") start = id(self.root) else: start = parent parts = [p for p in path.split("/") if p] # Each state is (node, index into parts) queue = deque([(start, 0)]) results: set[int] = set() while queue: node, i = queue.popleft() # Finished matching the pattern if i == len(parts): results.add(node) continue part = parts[i] if part == ".": queue.append((node, i + 1)) elif part == "..": preds = list(self.graph.predecessors(node)) nd_parent = preds[0] if preds else node queue.append((nd_parent, i + 1)) elif part == "**": # Case 1: ** matches zero components queue.append((node, i + 1)) # Case 2: ** matches one or more components for child in self.graph.successors(node): queue.append((child, i)) else: for child in self.graph.successors(node): name = self.graph.nodes[child]["name"] if fnmatch(name, part): queue.append((child, i + 1)) return list(results) def _resolveSingle(self, path: str, parent: int | None = None) -> int: """Find the node at the given path. Parameters ---------- path : str The path to use to find node(s). parent : int | None The parent to start the path traversal from. Returns ------- int The node at the given path. """ matches = self._resolve(path, parent=parent) if len(matches) != 1: raise ValueError(f"Expected 1 match for '{path}', got {len(matches)}") return matches[0]
[docs] def getClosest(self, start: T, name: str) -> T: """Get the closest object with the given name. If more than one object has this name at the same closest distance, then an error is thrown. Parameters ---------- start : T Where to start searching from. name : str The name of the object to look for. This can use globs and wildcards. Returns ------- T The closest object with that name. """ visited: set[int] = set() visited.add(id(start)) queue = deque([(id(start), 0)]) matches = [] found_distance = None while queue: node, dist = queue.popleft() # If we've gone past the first match distance, stop if found_distance is not None and dist > found_distance: break node_name = self.graph.nodes[node]["name"] if fnmatch(node_name, name): matches.append(node) found_distance = dist continue # Expand neighbors neighbors = list(self.graph.successors(node)) parent = next(self.graph.predecessors(node), None) if parent is not None: neighbors.append(parent) for n in neighbors: if n not in visited: visited.add(n) queue.append((n, dist + 1)) if not matches: raise ValueError(f"No node named '{name}' found") if len(matches) > 1: raise ValueError( f"Multiple nodes named '{name}' at distance {found_distance} were found." ) obj = self.graph.nodes[matches[0]]["obj_ref"]() if obj is None: raise ValueError("Found unique match, but the Python weakref no longer exists.") else: return obj
[docs] def dumpTree(self, node: int | None = None, prefix: str = "", is_last: bool = True): """Display the database as a tree on the command line. This is a recursive method that is meant to be called with the keyword arguments as their defaults. Parameters ---------- node : int | None The node to dump from. prefix : str The prefix to use when printing this nodes information. is_last : bool If this is the last node at the current level. """ if node is None: node = id(self.root) print(self.graph.nodes[node]["name"]) new_prefix = prefix else: connector = "└── " if is_last else "├── " print(prefix + connector + self.graph.nodes[node]["name"]) new_prefix = prefix + (" " if is_last else "│ ") children = list(self.graph.successors(node)) for i, child in enumerate(children): last = i == len(children) - 1 self.dumpTree(child, new_prefix, last)