Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/model.py
Show All 16 Lines | |||||
from .exceptions import InvalidDirectoryPath, InvalidObjectType | from .exceptions import InvalidDirectoryPath, InvalidObjectType | ||||
from .plot import generate_sunburst, offline_plot | from .plot import generate_sunburst, offline_plot | ||||
class Color(Enum): | class Color(Enum): | ||||
blue = "\033[94m" | blue = "\033[94m" | ||||
green = "\033[92m" | green = "\033[92m" | ||||
yellow = "\033[93m" | |||||
magenta = "\033[95m" | |||||
red = "\033[91m" | red = "\033[91m" | ||||
end = "\033[0m" | end = "\033[0m" | ||||
def colorize(text: str, color: Color): | def colorize(text: str, color: Color): | ||||
return color.value + text + Color.end.value | return color.value + text + Color.end.value | ||||
class Status(Enum): | |||||
unset = 0 | |||||
set = 1 | |||||
queried = 2 | |||||
class Tree: | class Tree: | ||||
"""Representation of a file system structure | """Representation of a file system structure | ||||
""" | """ | ||||
def __init__(self, path: Path, father: Tree = None): | def __init__(self, path: Path, father: Tree = None): | ||||
self.father = father | self.father = father | ||||
self.path = path | self.path = path | ||||
self.otype = DIRECTORY if path.is_dir() else CONTENT | self.otype = DIRECTORY if path.is_dir() else CONTENT | ||||
self.swhid = "" | self.swhid = "" | ||||
self.known = False | self.known = False | ||||
self.status = Status.unset | |||||
self.children: Dict[Path, Tree] = {} | self.children: Dict[Path, Tree] = {} | ||||
def add_node(self, path: Path, swhid: str, known: bool) -> None: | def __len__(self): | ||||
return sum(1 for node in self.iterate()) + 1 # the root node | |||||
def add_node(self, path: Path, swhid: str) -> None: | |||||
"""Recursively add a new path. | """Recursively add a new path. | ||||
""" | """ | ||||
relative_path = path.relative_to(self.path) | relative_path = path.relative_to(self.path) | ||||
if relative_path == Path("."): | if relative_path == Path("."): | ||||
self.swhid = swhid | self.swhid = swhid | ||||
self.known = known | |||||
return | return | ||||
new_path = self.path.joinpath(relative_path.parts[0]) | new_path = self.path.joinpath(relative_path.parts[0]) | ||||
if new_path not in self.children: | if new_path not in self.children: | ||||
self.children[new_path] = Tree(new_path, self) | self.children[new_path] = Tree(new_path, self) | ||||
self.children[new_path].add_node(path, swhid, known) | self.children[new_path].add_node(path, swhid) | ||||
def show(self, fmt) -> None: | def show(self, fmt) -> None: | ||||
"""Show tree in different formats""" | """Show tree in different formats""" | ||||
if fmt == "json": | if fmt == "json": | ||||
print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) | print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) | ||||
if fmt == "ndjson": | if fmt == "ndjson": | ||||
print(ndjson.dumps(node.attributes for node in self.iterate())) | print(ndjson.dumps(node.attributes for node in self.iterate())) | ||||
Show All 17 Lines | def print_children(self, isatty: bool, inc: int = 1) -> None: | ||||
node.print_children(isatty, inc + 1) | node.print_children(isatty, inc + 1) | ||||
def print_node(self, node: Any, isatty: bool, inc: int) -> None: | def print_node(self, node: Any, isatty: bool, inc: int) -> None: | ||||
rel_path = str(node.path.relative_to(self.path)) | rel_path = str(node.path.relative_to(self.path)) | ||||
begin = "│ " * inc | begin = "│ " * inc | ||||
end = "/" if node.otype == DIRECTORY else "" | end = "/" if node.otype == DIRECTORY else "" | ||||
if isatty: | if isatty: | ||||
if not node.known: | if node.status == Status.unset: | ||||
rel_path = colorize(rel_path, Color.red) | rel_path = colorize(rel_path, Color.magenta) | ||||
elif node.otype == DIRECTORY: | elif node.status == Status.set and not node.known: | ||||
rel_path = colorize(rel_path, Color.yellow) | |||||
elif node.status == Status.set and node.known: | |||||
rel_path = colorize(rel_path, Color.blue) | rel_path = colorize(rel_path, Color.blue) | ||||
elif node.otype == CONTENT: | elif node.status == Status.queried and not node.known: | ||||
rel_path = colorize(rel_path, Color.red) | |||||
elif node.status == Status.queried and node.known: | |||||
rel_path = colorize(rel_path, Color.green) | rel_path = colorize(rel_path, Color.green) | ||||
print(f"{begin}{rel_path}{end}") | print(f"{begin}{rel_path}{end}") | ||||
@property | @property | ||||
def known(self): | |||||
return self._known | |||||
@known.setter | |||||
def known(self, value: bool): | |||||
self._known = value | |||||
self.status = Status.set | |||||
@property | |||||
def attributes(self) -> Dict[str, Dict[str, Any]]: | def attributes(self) -> Dict[str, Dict[str, Any]]: | ||||
""" | """ | ||||
Get the attributes of the current node grouped by the relative path. | Get the attributes of the current node grouped by the relative path. | ||||
Returns: | Returns: | ||||
a dictionary containing a path as key and its known/unknown status and the | a dictionary containing a path as key and its known/unknown status and the | ||||
SWHID as values. | SWHID as values. | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def iterate(self) -> Iterator[Tree]: | ||||
Recursively iterate through the children of the current node | Recursively iterate through the children of the current node | ||||
""" | """ | ||||
for _, child_node in self.children.items(): | for _, child_node in self.children.items(): | ||||
yield child_node | yield child_node | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
yield from child_node.iterate() | yield from child_node.iterate() | ||||
def iterate_bfs(self) -> Iterator[Tree]: | |||||
"""Get nodes in BFS order | |||||
""" | |||||
nodes = [] | |||||
nodes.append(self) | |||||
while len(nodes) > 0: | |||||
for node in nodes.copy(): | |||||
yield node | |||||
nodes.remove(node) | |||||
if node.otype == DIRECTORY: | |||||
nodes.extend(list(node.children.values())) | |||||
def get_files_from_dir(self, dir_path: Path) -> List: | def get_files_from_dir(self, dir_path: Path) -> List: | ||||
""" | """ | ||||
Retrieve files information about a specific directory path | Retrieve files information about a specific directory path | ||||
Returns: | Returns: | ||||
A list containing the files attributes present inside the directory given | A list containing the files attributes present inside the directory given | ||||
in input | in input | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | class Tree: | ||||
def has_dirs(self) -> bool: | def has_dirs(self) -> bool: | ||||
"""Checks if node has directories | """Checks if node has directories | ||||
""" | """ | ||||
for _, child_node in self.children.items(): | for _, child_node in self.children.items(): | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
return True | return True | ||||
return False | return False | ||||
def has_contents(self) -> bool: | |||||
for _, child_node in self.children.items(): | |||||
if child_node.otype == CONTENT: | |||||
return True | |||||
return False |