Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/model.py
Show All 36 Lines | class Tree: | ||||
def __init__(self, path: Path, father: Tree = None): | def __init__(self, path: Path, father: Tree = None): | ||||
self.father = father | self.father = father | ||||
self.path = path | self.path = path | ||||
self.otype = DIRECTORY if path.is_dir() else CONTENT | self.otype = DIRECTORY if path.is_dir() else CONTENT | ||||
self.swhid = "" | self.swhid = "" | ||||
self.known = False | self.known = False | ||||
self.children: Dict[Path, Tree] = {} | self.children: Dict[Path, Tree] = {} | ||||
def addNode(self, path: Path, swhid: str, known: bool) -> None: | def add_node(self, path: Path, swhid: str, known: bool) -> None: | ||||
"""Recursively add a new path. | """Recursively add a new path. | ||||
""" | """ | ||||
relative_path = path.relative_to(self.path) | relative_path = path.relative_to(self.path) | ||||
if relative_path == Path("."): | if relative_path == Path("."): | ||||
self.swhid = swhid | self.swhid = swhid | ||||
self.known = known | self.known = known | ||||
return | return | ||||
new_path = self.path.joinpath(relative_path.parts[0]) | new_path = self.path.joinpath(relative_path.parts[0]) | ||||
if new_path not in self.children: | if new_path not in self.children: | ||||
self.children[new_path] = Tree(new_path, self) | self.children[new_path] = Tree(new_path, self) | ||||
self.children[new_path].addNode(path, swhid, known) | self.children[new_path].add_node(path, swhid, known) | ||||
def show(self, format) -> None: | def show(self, fmt) -> None: | ||||
ardumont: add a type str here. | |||||
"""Show tree in different formats""" | """Show tree in different formats""" | ||||
if format == "json": | if fmt == "json": | ||||
print(json.dumps(self.toDict(), indent=4, sort_keys=True)) | print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) | ||||
if format == "ndjson": | if fmt == "ndjson": | ||||
print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr())) | print(ndjson.dumps(dict_path for dict_path in self._iter_nodes_attr())) | ||||
elif format == "text": | elif fmt == "text": | ||||
isatty = sys.stdout.isatty() | isatty = sys.stdout.isatty() | ||||
print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) | print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) | ||||
self.printChildren(isatty) | self.print_children(isatty) | ||||
elif format == "sunburst": | elif fmt == "sunburst": | ||||
root = self.path | root = self.path | ||||
directories = self.getDirectoriesInfo(root) | directories = self.get_directories_info(root) | ||||
sunburst = generate_sunburst(directories, root) | sunburst = generate_sunburst(directories, root) | ||||
offline_plot(sunburst) | offline_plot(sunburst) | ||||
def printChildren(self, isatty: bool, inc: int = 1) -> None: | def print_children(self, isatty: bool, inc: int = 1) -> None: | ||||
for path, node in self.children.items(): | for path, node in self.children.items(): | ||||
self.printNode(node, isatty, inc) | self.print_node(node, isatty, inc) | ||||
if node.children: | if node.children: | ||||
node.printChildren(isatty, inc + 1) | node.print_children(isatty, inc + 1) | ||||
def printNode(self, node: Any, isatty: bool, inc: int) -> None: | def print_node(self, node: Any, isatty: bool, inc: int) -> None: | ||||
rel_path = str(node.path.relative_to(self.path)) | rel_path = str(node.path.relative_to(self.path)) | ||||
begin = "│ " * inc | begin = "│ " * inc | ||||
end = "/" if node.otype == DIRECTORY else "" | end = "/" if node.otype == DIRECTORY else "" | ||||
if isatty: | if isatty: | ||||
if not node.known: | if not node.known: | ||||
rel_path = colorize(rel_path, Color.red) | rel_path = colorize(rel_path, Color.red) | ||||
elif node.otype == DIRECTORY: | elif node.otype == DIRECTORY: | ||||
Show All 10 Lines | def attributes(self) -> Dict[str, Dict[str, Any]]: | ||||
Returns: | Returns: | ||||
a dictionary containing a path as key and its known/unknown status and the | a dictionary containing a path as key and its known/unknown status and the | ||||
SWHID as values. | SWHID as values. | ||||
""" | """ | ||||
return {str(self.path): {"swhid": self.swhid, "known": self.known,}} | return {str(self.path): {"swhid": self.swhid, "known": self.known,}} | ||||
def toDict(self) -> Dict[str, Dict[str, Any]]: | def to_dict(self) -> Dict[str, Dict[str, Any]]: | ||||
""" | """ | ||||
Recursively flatten the current tree nodes into a dictionary. | Recursively flatten the current tree nodes into a dictionary. | ||||
For example, if you have the following structure: | For example, if you have the following structure: | ||||
.. code-block:: none | .. code-block:: none | ||||
root { | root { | ||||
Show All 18 Lines | def to_dict(self) -> Dict[str, Dict[str, Any]]: | ||||
"root/subdir/file.txt": { | "root/subdir/file.txt": { | ||||
"swhid": "...", | "swhid": "...", | ||||
"known": True/False | "known": True/False | ||||
} | } | ||||
} | } | ||||
""" | """ | ||||
return {k: v for d in self.__iterNodesAttr() for k, v in d.items()} | return {k: v for d in self._iter_nodes_attr() for k, v in d.items()} | ||||
def iterate(self) -> Iterator[Tree]: | def iterate(self) -> Iterator[Tree]: | ||||
""" | """ | ||||
Recursively iterate through the children of the current node | Recursively iterate through the children of the current node | ||||
""" | """ | ||||
for _, child_node in self.children.items(): | for _, child_node in self.children.items(): | ||||
yield child_node | yield child_node | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
yield from child_node.iterate() | yield from child_node.iterate() | ||||
def __iterNodesAttr(self) -> Iterator[Dict[str, Dict[str, Any]]]: | def _iter_nodes_attr(self) -> Iterator[Dict[str, Dict[str, Any]]]: | ||||
""" | """ | ||||
Recursively iterate through the children of the current node returning | Recursively iterate through the children of the current node returning | ||||
an iterable of the children nodes attributes | an iterable of the children nodes attributes | ||||
Yields: | Yields: | ||||
a dictionary containing a path with its known/unknown status and the | a dictionary containing a path with its known/unknown status and the | ||||
SWHID | SWHID | ||||
""" | """ | ||||
for child_node in self.iterate(): | for child_node in self.iterate(): | ||||
yield child_node.attributes | yield child_node.attributes | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
yield from child_node.__iterNodesAttr() | yield from child_node._iter_nodes_attr() | ||||
Not Done Inline Actionsunrelated with this diff, but does this code do what it pretends to do? douardda: unrelated with this diff, but does this code do what it pretends to do?
Looks to me there a… | |||||
Not Done Inline ActionsI'm not sure this method is actually useful. It seems to be used twice, in to_dict(). which can be rewritten as (if I'm not mistaken) return {k: v for node in self.iterate() for k, v in node.attributes.items()} and in show(), where it can also be reworked as: print(ndjson.dumps(node.attributes for node in self.iterate()) douardda: I'm not sure this method is actually useful. It seems to be used twice, in to_dict(). which can… | |||||
Done Inline ActionsYes double recursion, I noted that for later. Not only this branch is not needed, it is actually harmful to correctness and performance. This function could be dropped altogether because it just does .attributes on top of iterate tenma: Yes double recursion, I noted that for later. Not only this branch is not needed, it is… | |||||
def getFilesFromDir(self, dir_path: Path) -> List: | def get_files_from_dir(self, dir_path: Path) -> List: | ||||
""" | """ | ||||
Retrieve files information about a specific directory path | Retrieve files information about a specific directory path | ||||
Returns: | Returns: | ||||
A list containing the files attributes present inside the directory given | A list containing the files attributes present inside the directory given | ||||
in input | in input | ||||
""" | """ | ||||
def getFiles(node): | def get_files(node): | ||||
files = [] | files = [] | ||||
for _, node in node.children.items(): | for _, node in node.children.items(): | ||||
if node.otype == CONTENT: | if node.otype == CONTENT: | ||||
files.append(node.attributes) | files.append(node.attributes) | ||||
return files | return files | ||||
if dir_path == self.path: | if dir_path == self.path: | ||||
return getFiles(self) | return get_files(self) | ||||
else: | else: | ||||
for node in self.iterate(): | for node in self.iterate(): | ||||
if node.path == dir_path: | if node.path == dir_path: | ||||
return getFiles(node) | return get_files(node) | ||||
raise InvalidDirectoryPath( | raise InvalidDirectoryPath( | ||||
"The directory provided doesn't match any stored directory" | "The directory provided doesn't match any stored directory" | ||||
) | ) | ||||
def __getSubDirsInfo(self, root, directories): | def _get_sub_dirs_info(self, root, directories): | ||||
Not Done Inline Actionsmight as well add types since you are in a refactoring session ;) ardumont: might as well add types since you are in a refactoring session ;) | |||||
"""Fills the directories given in input with the contents information | """Fills the directories given in input with the contents information | ||||
stored inside the directory child, only if they have contents. | stored inside the directory child, only if they have contents. | ||||
""" | """ | ||||
for path, child_node in self.children.items(): | for path, child_node in self.children.items(): | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
rel_path = path.relative_to(root) | rel_path = path.relative_to(root) | ||||
contents_info = child_node.count_contents() | contents_info = child_node.count_contents() | ||||
# checks the first element of the tuple | # checks the first element of the tuple | ||||
# (the number of contents in a directory) | # (the number of contents in a directory) | ||||
# if it is equal to zero it means that there are no contents | # if it is equal to zero it means that there are no contents | ||||
# in that directory. | # in that directory. | ||||
if not contents_info[0] == 0: | if not contents_info[0] == 0: | ||||
directories[rel_path] = contents_info | directories[rel_path] = contents_info | ||||
if child_node.has_dirs(): | if child_node.has_dirs(): | ||||
child_node.__getSubDirsInfo(root, directories) | child_node._get_sub_dirs_info(root, directories) | ||||
def getDirectoriesInfo(self, root: Path) -> Dict[Path, Tuple[int, int]]: | def get_directories_info(self, root: Path) -> Dict[Path, Tuple[int, int]]: | ||||
"""Get information about all directories under the given root. | """Get information about all directories under the given root. | ||||
Returns: | Returns: | ||||
A dictionary with a directory path as key and the relative | A dictionary with a directory path as key and the relative | ||||
contents information (the result of count_contents) as values. | contents information (the result of count_contents) as values. | ||||
""" | """ | ||||
directories = {root: self.count_contents()} | directories = {root: self.count_contents()} | ||||
self.__getSubDirsInfo(root, directories) | self._get_sub_dirs_info(root, directories) | ||||
return directories | return directories | ||||
def count_contents(self) -> Tuple[int, int]: | def count_contents(self) -> Tuple[int, int]: | ||||
"""Count how many contents are present inside a directory. | """Count how many contents are present inside a directory. | ||||
If a directory has a SWHID returns as it has all the contents. | If a directory has a SWHID returns as it has all the contents. | ||||
Returns: | Returns: | ||||
A tuple with the total number of the contents and the number | A tuple with the total number of the contents and the number | ||||
of contents known (the ones that have a persistent identifier). | of contents known (the ones that have a persistent identifier). | ||||
""" | """ | ||||
contents = 0 | contents = 0 | ||||
discovered = 0 | discovered = 0 | ||||
if not self.otype == DIRECTORY: | if not self.otype == DIRECTORY: | ||||
raise InvalidObjectType( | raise InvalidObjectType( | ||||
"Can't calculate contents of the " "object type: %s" % self.otype | "Can't count contents of the object type: %s" % self.otype | ||||
) | ) | ||||
if self.known: | if self.known: | ||||
# to identify a directory with all files/directories present | # to identify a directory with all files/directories present | ||||
return (1, 1) | return (1, 1) | ||||
else: | else: | ||||
for _, child_node in self.children.items(): | for _, child_node in self.children.items(): | ||||
if child_node.otype == CONTENT: | if child_node.otype == CONTENT: | ||||
Show All 13 Lines |
add a type str here.