Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/model.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from __future__ import annotations | from __future__ import annotations | ||||
import sys | import sys | ||||
import json | import json | ||||
from pathlib import PosixPath | from pathlib import Path | ||||
from typing import Any, Dict, Tuple, Iterable, List | from typing import Any, Dict, Tuple, Iterable, List | ||||
from enum import Enum | from enum import Enum | ||||
import ndjson | import ndjson | ||||
from .plot import generate_sunburst, offline_plot | from .plot import generate_sunburst, offline_plot | ||||
from .exceptions import InvalidObjectType, InvalidDirectoryPath | from .exceptions import InvalidObjectType, InvalidDirectoryPath | ||||
from swh.model.identifiers import DIRECTORY, CONTENT | from swh.model.identifiers import DIRECTORY, CONTENT | ||||
Show All 9 Lines | |||||
def colorize(text: str, color: Color): | def colorize(text: str, color: Color): | ||||
return color.value + text + Color.end.value | return color.value + text + Color.end.value | ||||
class Tree: | class Tree: | ||||
"""Representation of a file system structure | """Representation of a file system structure | ||||
""" | """ | ||||
def __init__(self, path: PosixPath, father: Tree = None): | def __init__(self, path: Path, father: Tree = None): | ||||
self.father = father | self.father = father | ||||
self.path = path | self.path = path | ||||
self.otype = DIRECTORY if path.is_dir() else CONTENT | self.otype = DIRECTORY if path.is_dir() else CONTENT | ||||
self.swhid = "" | self.swhid = "" | ||||
self.known = False | self.known = False | ||||
self.children: Dict[PosixPath, Tree] = {} | self.children: Dict[Path, Tree] = {} | ||||
def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: | def addNode(self, path: Path, swhid: str, known: bool) -> None: | ||||
"""Recursively add a new path. | """Recursively add a new path. | ||||
""" | """ | ||||
relative_path = path.relative_to(self.path) | relative_path = path.relative_to(self.path) | ||||
if relative_path == PosixPath("."): | if relative_path == Path("."): | ||||
self.swhid = swhid | self.swhid = swhid | ||||
self.known = known | self.known = known | ||||
return | return | ||||
new_path = self.path.joinpath(relative_path.parts[0]) | new_path = self.path.joinpath(relative_path.parts[0]) | ||||
if new_path not in self.children: | if new_path not in self.children: | ||||
self.children[new_path] = Tree(new_path, self) | self.children[new_path] = Tree(new_path, self) | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]: | ||||
a dictionary containing a path with its known/unknown status and the | a dictionary containing a path with its known/unknown status and the | ||||
SWHID | SWHID | ||||
""" | """ | ||||
for child_node in self.iterate(): | for child_node in self.iterate(): | ||||
yield child_node.attributes | yield child_node.attributes | ||||
if child_node.otype == DIRECTORY: | if child_node.otype == DIRECTORY: | ||||
yield from child_node.__iterNodesAttr() | yield from child_node.__iterNodesAttr() | ||||
def getFilesFromDir(self, dir_path: PosixPath) -> List: | def getFilesFromDir(self, dir_path: Path) -> List: | ||||
""" | """ | ||||
Retrieve files information about a specific directory path | Retrieve files information about a specific directory path | ||||
Returns: | Returns: | ||||
A list containing the files attributes present inside the directory given | A list containing the files attributes present inside the directory given | ||||
in input | in input | ||||
""" | """ | ||||
Show All 26 Lines | def __getSubDirsInfo(self, root, directories): | ||||
# (the number of contents in a directory) | # (the number of contents in a directory) | ||||
# if it is equal to zero it means that there are no contents | # if it is equal to zero it means that there are no contents | ||||
# in that directory. | # in that directory. | ||||
if not contents_info[0] == 0: | if not contents_info[0] == 0: | ||||
directories[rel_path] = contents_info | directories[rel_path] = contents_info | ||||
if child_node.has_dirs(): | if child_node.has_dirs(): | ||||
child_node.__getSubDirsInfo(root, directories) | child_node.__getSubDirsInfo(root, directories) | ||||
def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: | def getDirectoriesInfo(self, root: Path) -> Dict[Path, Tuple[int, int]]: | ||||
"""Get information about all directories under the given root. | """Get information about all directories under the given root. | ||||
Returns: | Returns: | ||||
A dictionary with a directory path as key and the relative | A dictionary with a directory path as key and the relative | ||||
contents information (the result of count_contents) as values. | contents information (the result of count_contents) as values. | ||||
""" | """ | ||||
directories = {root: self.count_contents()} | directories = {root: self.count_contents()} | ||||
Show All 39 Lines |