Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/model.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from __future__ import annotations | from __future__ import annotations | ||||
import sys | import sys | ||||
import json | import json | ||||
from pathlib import PosixPath | from pathlib import PosixPath | ||||
from typing import Any, Dict, Tuple | from typing import Any, Dict, Tuple, Iterable | ||||
from enum import Enum | from enum import Enum | ||||
from .plot import sunburst | from .plot import sunburst | ||||
from .exceptions import InvalidObjectType | from .exceptions import InvalidObjectType | ||||
from swh.model.identifiers import DIRECTORY, CONTENT | from swh.model.identifiers import DIRECTORY, CONTENT | ||||
Show All 11 Lines | |||||
class Tree: | class Tree: | ||||
"""Representation of a file system structure | """Representation of a file system structure | ||||
""" | """ | ||||
def __init__(self, path: PosixPath, father: Tree = None): | def __init__(self, path: PosixPath, father: Tree = None): | ||||
self.father = father | self.father = father | ||||
self.path = path | self.path = path | ||||
self.otype = DIRECTORY if path.is_dir() else CONTENT | self.otype = DIRECTORY if path.is_dir() else CONTENT | ||||
self.pid = "" | self.swhid = "" | ||||
self.known = False | |||||
self.children: Dict[PosixPath, Tree] = {} | self.children: Dict[PosixPath, Tree] = {} | ||||
def addNode(self, path: PosixPath, pid: str = None) -> None: | def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: | ||||
"""Recursively add a new path. | """Recursively add a new path. | ||||
""" | """ | ||||
relative_path = path.relative_to(self.path) | relative_path = path.relative_to(self.path) | ||||
if relative_path == PosixPath("."): | if relative_path == PosixPath("."): | ||||
if pid is not None: | self.swhid = swhid | ||||
self.pid = pid | self.known = known | ||||
return | return | ||||
new_path = self.path.joinpath(relative_path.parts[0]) | new_path = self.path.joinpath(relative_path.parts[0]) | ||||
if new_path not in self.children: | if new_path not in self.children: | ||||
self.children[new_path] = Tree(new_path, self) | self.children[new_path] = Tree(new_path, self) | ||||
self.children[new_path].addNode(path, pid) | self.children[new_path].addNode(path, swhid, known) | ||||
def show(self, format) -> None: | def show(self, format) -> None: | ||||
"""Show tree in different formats""" | """Show tree in different formats""" | ||||
if format == "json": | if format == "json": | ||||
print(json.dumps(self.getTree(), indent=4, sort_keys=True)) | print(json.dumps(self.getTree(), indent=4, sort_keys=True)) | ||||
elif format == "text": | elif format == "text": | ||||
isatty = sys.stdout.isatty() | isatty = sys.stdout.isatty() | ||||
Show All 13 Lines | def printChildren(self, isatty: bool, inc: int = 1) -> None: | ||||
node.printChildren(isatty, inc + 1) | node.printChildren(isatty, inc + 1) | ||||
def printNode(self, node: Any, isatty: bool, inc: int) -> None: | def printNode(self, node: Any, isatty: bool, inc: int) -> None: | ||||
rel_path = str(node.path.relative_to(self.path)) | rel_path = str(node.path.relative_to(self.path)) | ||||
begin = "│ " * inc | begin = "│ " * inc | ||||
end = "/" if node.otype == DIRECTORY else "" | end = "/" if node.otype == DIRECTORY else "" | ||||
if isatty: | if isatty: | ||||
if not node.pid: | if not node.known: | ||||
rel_path = colorize(rel_path, Color.red) | rel_path = colorize(rel_path, Color.red) | ||||
elif node.otype == DIRECTORY: | elif node.otype == DIRECTORY: | ||||
rel_path = colorize(rel_path, Color.blue) | rel_path = colorize(rel_path, Color.blue) | ||||
elif node.otype == CONTENT: | elif node.otype == CONTENT: | ||||
rel_path = colorize(rel_path, Color.green) | rel_path = colorize(rel_path, Color.green) | ||||
print(f"{begin}{rel_path}{end}") | print(f"{begin}{rel_path}{end}") | ||||
@property | |||||
def attributes(self): | |||||
""" | |||||
Get the attributes of the current node grouped by the relative path. | |||||
Returns: | |||||
a dictionary containing a path as key and its known/unknown status and the | |||||
Software Heritage persistent identifier as values. | |||||
vlorentz: Neither the name or the docstring is very clear about what it returns. | |||||
""" | |||||
return {str(self.path): {"swhid": self.swhid, "known": self.known,}} | |||||
def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]: | |||||
Not Done Inline Actionsreturn {str(self.path): { "swhid": self.swhid, "known": self.known, }} vlorentz: ```
return {str(self.path): {
"swhid": self.swhid,
"known": self.known,
}}
``` | |||||
""" | |||||
Recursively groups the current child nodes inside a dictionary. | |||||
For example, if you have the following structure: | |||||
Not Done Inline ActionsThis docstring could use an example vlorentz: This docstring could use an example | |||||
.. code-block:: none | |||||
root { | |||||
subdir: { | |||||
file.txt | |||||
} | |||||
} | |||||
The generated dictionary will be: | |||||
.. code-block:: none | |||||
{ | |||||
"root": { | |||||
"swhid": "...", | |||||
"known": True/False | |||||
} | |||||
"root/subdir": { | |||||
"swhid": "...", | |||||
"known": True/False | |||||
} | |||||
"root/subdir/file.txt": { | |||||
"swhid": "...", | |||||
"known": True/False | |||||
} | |||||
} | |||||
""" | |||||
for node_dict in self.iterate(): | |||||
dict_nodes.update(node_dict) | |||||
return dict_nodes | |||||
def iterate(self) -> Iterable[Dict[str, Dict]]: | |||||
""" | |||||
Recursively iterate through the children of the current node | |||||
Yields: | |||||
a dictionary containing a path with its known/unknown status and the | |||||
Software Heritage persistent identifier | |||||
""" | |||||
for _, child_node in self.children.items(): | |||||
yield child_node.attributes | |||||
if child_node.otype == DIRECTORY: | |||||
yield from child_node.iterate() | |||||
def getTree(self): | def getTree(self): | ||||
"""Walk through the tree to discover content or directory that have | """Walk through the tree to discover content or directory that have | ||||
a persistent identifier. If a persistent identifier is found it saves | a persistent identifier. If a persistent identifier is found it saves | ||||
the path with the relative PID. | the path with the relative PID. | ||||
Returns: | Returns: | ||||
child_tree: the tree with the content/directory found | child_tree: the tree with the content/directory found | ||||
""" | """ | ||||
child_tree = {} | child_tree = {} | ||||
for path, child_node in self.children.items(): | for path, child_node in self.children.items(): | ||||
rel_path = str(child_node.path.relative_to(self.path)) | rel_path = str(child_node.path.relative_to(self.path)) | ||||
if child_node.pid: | if child_node.swhid: | ||||
child_tree[rel_path] = child_node.pid | child_tree[rel_path] = child_node.swhid | ||||
else: | else: | ||||
next_tree = child_node.getTree() | next_tree = child_node.getTree() | ||||
if next_tree: | if next_tree: | ||||
child_tree[rel_path] = next_tree | child_tree[rel_path] = next_tree | ||||
return child_tree | return child_tree | ||||
def __getSubDirsInfo(self, root, directories): | def __getSubDirsInfo(self, root, directories): | ||||
Show All 37 Lines | def count_contents(self) -> Tuple[int, int]: | ||||
contents = 0 | contents = 0 | ||||
discovered = 0 | discovered = 0 | ||||
if not self.otype == DIRECTORY: | if not self.otype == DIRECTORY: | ||||
raise InvalidObjectType( | raise InvalidObjectType( | ||||
"Can't calculate contents of the " "object type: %s" % self.otype | "Can't calculate contents of the " "object type: %s" % self.otype | ||||
) | ) | ||||
if self.pid: | if self.known: | ||||
# to identify a directory with all files/directories present | # to identify a directory with all files/directories present | ||||
return (1, 1) | return (1, 1) | ||||
else: | else: | ||||
for _, child_node in self.children.items(): | for _, child_node in self.children.items(): | ||||
if child_node.otype == CONTENT: | if child_node.otype == CONTENT: | ||||
contents += 1 | contents += 1 | ||||
if child_node.pid: | if child_node.known: | ||||
discovered += 1 | discovered += 1 | ||||
return (contents, discovered) | return (contents, discovered) | ||||
def has_dirs(self) -> bool: | def has_dirs(self) -> bool: | ||||
"""Checks if node has directories | """Checks if node has directories | ||||
""" | """ | ||||
for _, child_node in self.children.items(): | for _, child_node in self.children.items(): | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
return True | return True | ||||
return False | return False |
Neither the name or the docstring is very clear about what it returns.