# Copyright (c) 2024-2026 Karana Dynamics Pty Ltd. All rights reserved.
#
# NOTICE TO USER:
#
# This source code and/or documentation (the "Licensed Materials") is
# the confidential and proprietary information of Karana Dynamics Inc.
# Use of these Licensed Materials is governed by the terms and conditions
# of a separate software license agreement between Karana Dynamics and the
# Licensee ("License Agreement"). Unless expressly permitted under that
# agreement, any reproduction, modification, distribution, or disclosure
# of the Licensed Materials, in whole or in part, to any third party
# without the prior written consent of Karana Dynamics is strictly prohibited.
#
# THE LICENSED MATERIALS ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND.
# KARANA DYNAMICS DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO WARRANTIES OF MERCHANTABILITY, NON-INFRINGEMENT, AND
# FITNESS FOR A PARTICULAR PURPOSE.
#
# IN NO EVENT SHALL KARANA DYNAMICS BE LIABLE FOR ANY DAMAGES WHATSOEVER,
# INCLUDING BUT NOT LIMITED TO LOSS OF PROFITS, DATA, OR USE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, WHETHER IN CONTRACT, TORT,
# OR OTHERWISE ARISING OUT OF OR IN CONNECTION WITH THE LICENSED MATERIALS.
#
# U.S. Government End Users: The Licensed Materials are a "commercial item"
# as defined at 48 C.F.R. 2.101, and are provided to the U.S. Government
# only as a commercial end item under the terms of this license.
#
# Any use of the Licensed Materials in individual or commercial software must
# include, in the user documentation and internal source code comments,
# this Notice, Disclaimer, and U.S. Government Use Provision.
"""Classes and functions related to Filesystem."""
from fnmatch import fnmatch
from typing import overload
from weakref import ref
from networkx import DiGraph
from collections import deque
[docs]
class FilesystemLikeDatabase[T]:
"""Filesystem-like database for storing objects and looking them up by filepath-like names."""
def __init__(self):
"""Create an instance of FilesystemLikeDatabase."""
self.graph = DiGraph()
self.root = "/"
self.graph.add_node(id(self.root), name="/", obj_ref=None)
@overload
def addObject(self, name: str, obj: T, parent: str, /):
"""Add an object to the database.
Parameters
----------
name : str
The name of the object.
obj : T
The object itself.
parent : str
The parent as a path in the current database.
"""
...
@overload
def addObject(self, obj: T, parent: str, /):
"""Add an object to the database.
Parameters
----------
obj : T
The object itself. The object must implement a name() method that
returns a string.
parent : str
The parent as a path in the current database.
"""
...
@overload
def addObject(self, name: str, obj: T, parent: T, /):
"""Add an object to the database.
Parameters
----------
name : str
The name of the object.
obj : T
The object itself.
parent : T
The parent object, which must already be in the database.
"""
...
@overload
def addObject(self, obj: T, parent: T, /):
"""Add an object to the database.
Parameters
----------
obj : T
The object itself. The object must implement a name() method that
returns a string.
parent : T
The parent object, which must already be in the database.
"""
...
[docs]
def addObject(self, *args):
"""Add an object to the database.
1. Using a name, object, and parent as a path.
Parameters
----------
name : str
The name of the object.
obj : T
The object itself.
parent : str
The parent as a path in the current database.
2. Using an object with a name method and the parent as a path.
Parameters
----------
obj : T
The object itself. The object must implement a name() method that
returns a string.
parent : str
The parent as a path in the current database.
3. Using a name, an object, and the parent as an object.
Parameters
----------
name : str
The name of the object.
obj : T
The object itself.
parent : T
The parent object, which must already be in the database.
4. Using an object with a name method and the parent as an object.
Parameters
----------
obj : T
The object itself. The object must implement a name() method that
returns a string.
parent : T
The parent object, which must already be in the database.
"""
if len(args) == 2:
obj = args[0]
parent = args[1]
if isinstance(parent, str):
parent_id = self._resolveSingle(args[1])
else:
parent_id = self._getNode(args[1])
if not hasattr(obj, "name"):
raise ValueError(f"Object {obj} has no attribute name.")
name: str = obj.name()
self._addObject(name, obj, parent_id)
elif len(args) == 3:
name = args[0]
obj = args[1]
parent = args[2]
if isinstance(parent, str):
parent_id = self._resolveSingle(parent)
else:
parent_id = self._getNode(parent)
self._addObject(name, obj, parent_id)
[docs]
def getObjects(self, path: str, parent: T | None = None) -> list[T]:
"""Get objects in the database.
Parameters
----------
path : str
The path to use to find objects.
parent : T | None
The parent to start searching from. If None, then the root is used.
Returns
-------
list[T]
A list of objects found.
"""
if parent is not None:
parent_id = self._getNode(parent)
else:
parent_id = id(self.root)
objs: list[T] = []
for x in self._resolve(path, parent_id):
obj = self._getObject(x)
if obj is not None:
objs.append(obj)
return objs
[docs]
def getObject(self, path: str, parent: T | None = None) -> T:
"""Get object in the database.
Parameters
----------
path : str
The path to use to find the object.
parent : T | None
The parent to start searching from. If None, then the root is used.
Returns
-------
T
The object that matches the path.
"""
dark = self.getObjects(path, parent)
if len(dark) == 1:
return dark[0]
elif len(dark) > 1:
raise ValueError(f"Found non-unique object matching {path}.")
else:
raise ValueError(f"Did not find any objects matching {path}.")
def _getObject(self, obj_id: int) -> T | None:
"""Get the object at the given ID.
Parameters
----------
obj_id : int
The ID of the object to get.
Returns
-------
T | None
The object.
"""
obj_ref = self.graph.nodes[obj_id]["obj_ref"]
if obj_ref is not None:
return obj_ref()
else:
return None
def _getNode(self, obj: T) -> int:
"""Get the node associated with a given object.
Parameters
----------
obj : T
The object to find the node for.
Returns
-------
int
The integer of the node associated with that object.
"""
return id(obj)
def _addObject(self, name: str, obj: T, parent: int):
"""Add an object to the database.
Parameters
----------
name : str
The name of the object ot add.
obj : T
The object itself.
parent : int
The ID of the parent node to attach to.
"""
node_id = id(obj)
self.graph.add_node(
node_id,
name=name,
obj_ref=ref(obj),
)
self.graph.add_edge(parent, node_id)
def _resolve(self, path: str, parent: int | None = None) -> list[int]:
"""Find all the nodes at a given path.
Parameters
----------
path : str
The path to use to find node(s).
parent : int | None
The parent to start the path traversal from.
Returns
-------
list[int]
A list of all nodes at the given path.
"""
if parent is None:
if path[0] != "/":
raise ValueError("Parent is None, but path is not absolute.")
start = id(self.root)
else:
start = parent
parts = [p for p in path.split("/") if p]
# Each state is (node, index into parts)
queue = deque([(start, 0)])
results: set[int] = set()
while queue:
node, i = queue.popleft()
# Finished matching the pattern
if i == len(parts):
results.add(node)
continue
part = parts[i]
if part == ".":
queue.append((node, i + 1))
elif part == "..":
preds = list(self.graph.predecessors(node))
nd_parent = preds[0] if preds else node
queue.append((nd_parent, i + 1))
elif part == "**":
# Case 1: ** matches zero components
queue.append((node, i + 1))
# Case 2: ** matches one or more components
for child in self.graph.successors(node):
queue.append((child, i))
else:
for child in self.graph.successors(node):
name = self.graph.nodes[child]["name"]
if fnmatch(name, part):
queue.append((child, i + 1))
return list(results)
def _resolveSingle(self, path: str, parent: int | None = None) -> int:
"""Find the node at the given path.
Parameters
----------
path : str
The path to use to find node(s).
parent : int | None
The parent to start the path traversal from.
Returns
-------
int
The node at the given path.
"""
matches = self._resolve(path, parent=parent)
if len(matches) != 1:
raise ValueError(f"Expected 1 match for '{path}', got {len(matches)}")
return matches[0]
[docs]
def getClosest(self, start: T, name: str) -> T:
"""Get the closest object with the given name.
If more than one object has this name at the same closest distance,
then an error is thrown.
Parameters
----------
start : T
Where to start searching from.
name : str
The name of the object to look for. This can use globs and wildcards.
Returns
-------
T
The closest object with that name.
"""
visited: set[int] = set()
visited.add(id(start))
queue = deque([(id(start), 0)])
matches = []
found_distance = None
while queue:
node, dist = queue.popleft()
# If we've gone past the first match distance, stop
if found_distance is not None and dist > found_distance:
break
node_name = self.graph.nodes[node]["name"]
if fnmatch(node_name, name):
matches.append(node)
found_distance = dist
continue
# Expand neighbors
neighbors = list(self.graph.successors(node))
parent = next(self.graph.predecessors(node), None)
if parent is not None:
neighbors.append(parent)
for n in neighbors:
if n not in visited:
visited.add(n)
queue.append((n, dist + 1))
if not matches:
raise ValueError(f"No node named '{name}' found")
if len(matches) > 1:
raise ValueError(
f"Multiple nodes named '{name}' at distance {found_distance} were found."
)
obj = self.graph.nodes[matches[0]]["obj_ref"]()
if obj is None:
raise ValueError("Found unique match, but the Python weakref no longer exists.")
else:
return obj
[docs]
def dumpTree(self, node: int | None = None, prefix: str = "", is_last: bool = True):
"""Display the database as a tree on the command line.
This is a recursive method that is meant to be called with the
keyword arguments as their defaults.
Parameters
----------
node : int | None
The node to dump from.
prefix : str
The prefix to use when printing this nodes information.
is_last : bool
If this is the last node at the current level.
"""
if node is None:
node = id(self.root)
print(self.graph.nodes[node]["name"])
new_prefix = prefix
else:
connector = "└── " if is_last else "├── "
print(prefix + connector + self.graph.nodes[node]["name"])
new_prefix = prefix + (" " if is_last else "│ ")
children = list(self.graph.successors(node))
for i, child in enumerate(children):
last = i == len(children) - 1
self.dumpTree(child, new_prefix, last)