Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scanner/model.py b/swh/scanner/model.py
index 5502fd3..9ed9e2f 100644
--- a/swh/scanner/model.py
+++ b/swh/scanner/model.py
@@ -1,229 +1,239 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import sys
import json
from pathlib import PosixPath
from typing import Any, Dict, Tuple, Iterable
from enum import Enum
import ndjson
from .plot import generate_sunburst, offline_plot
from .exceptions import InvalidObjectType
from swh.model.identifiers import DIRECTORY, CONTENT
class Color(Enum):
blue = "\033[94m"
green = "\033[92m"
red = "\033[91m"
end = "\033[0m"
def colorize(text: str, color: Color):
return color.value + text + Color.end.value
class Tree:
"""Representation of a file system structure
"""
def __init__(self, path: PosixPath, father: Tree = None):
self.father = father
self.path = path
self.otype = DIRECTORY if path.is_dir() else CONTENT
self.swhid = ""
self.known = False
self.children: Dict[PosixPath, Tree] = {}
def addNode(self, path: PosixPath, swhid: str, known: bool) -> None:
"""Recursively add a new path.
"""
relative_path = path.relative_to(self.path)
if relative_path == PosixPath("."):
self.swhid = swhid
self.known = known
return
new_path = self.path.joinpath(relative_path.parts[0])
if new_path not in self.children:
self.children[new_path] = Tree(new_path, self)
self.children[new_path].addNode(path, swhid, known)
def show(self, format) -> None:
"""Show tree in different formats"""
if format == "json":
print(json.dumps(self.toDict(), indent=4, sort_keys=True))
if format == "ndjson":
- print(ndjson.dumps(dict_path for dict_path in self.iterate()))
+ print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr()))
elif format == "text":
isatty = sys.stdout.isatty()
print(colorize(str(self.path), Color.blue) if isatty else str(self.path))
self.printChildren(isatty)
elif format == "sunburst":
root = self.path
directories = self.getDirectoriesInfo(root)
sunburst = generate_sunburst(directories, root)
offline_plot(sunburst)
def printChildren(self, isatty: bool, inc: int = 1) -> None:
for path, node in self.children.items():
self.printNode(node, isatty, inc)
if node.children:
node.printChildren(isatty, inc + 1)
def printNode(self, node: Any, isatty: bool, inc: int) -> None:
rel_path = str(node.path.relative_to(self.path))
begin = "│ " * inc
end = "/" if node.otype == DIRECTORY else ""
if isatty:
if not node.known:
rel_path = colorize(rel_path, Color.red)
elif node.otype == DIRECTORY:
rel_path = colorize(rel_path, Color.blue)
elif node.otype == CONTENT:
rel_path = colorize(rel_path, Color.green)
print(f"{begin}{rel_path}{end}")
@property
def attributes(self):
"""
Get the attributes of the current node grouped by the relative path.
Returns:
a dictionary containing a path as key and its known/unknown status and the
Software Heritage persistent identifier as values.
"""
return {str(self.path): {"swhid": self.swhid, "known": self.known,}}
def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]:
"""
Recursively groups the current child nodes inside a dictionary.
For example, if you have the following structure:
.. code-block:: none
root {
subdir: {
file.txt
}
}
The generated dictionary will be:
.. code-block:: none
{
"root": {
"swhid": "...",
"known": True/False
}
"root/subdir": {
"swhid": "...",
"known": True/False
}
"root/subdir/file.txt": {
"swhid": "...",
"known": True/False
}
}
"""
- for node_dict in self.iterate():
+ for node_dict in self.__iterNodesAttr():
dict_nodes.update(node_dict)
return dict_nodes
- def iterate(self) -> Iterable[Dict[str, Dict]]:
+ def iterate(self) -> Iterable[Tree]:
"""
Recursively iterate through the children of the current node
+ """
+ for _, child_node in self.children.items():
+ yield child_node
+ if child_node.otype == DIRECTORY:
+ yield from child_node.iterate()
+
+ def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]:
+ """
+ Recursively iterate through the children of the current node returning
+ an iterable of the children nodes attributes
+
Yields:
a dictionary containing a path with its known/unknown status and the
Software Heritage persistent identifier
-
"""
- for _, child_node in self.children.items():
+ for child_node in self.iterate():
yield child_node.attributes
if child_node.otype == DIRECTORY:
- yield from child_node.iterate()
+ yield from child_node.__iterNodesAttr()
def __getSubDirsInfo(self, root, directories):
"""Fills the directories given in input with the contents information
stored inside the directory child, only if they have contents.
"""
for path, child_node in self.children.items():
if child_node.otype == DIRECTORY:
rel_path = path.relative_to(root)
contents_info = child_node.count_contents()
# checks the first element of the tuple
# (the number of contents in a directory)
# if it is equal to zero it means that there are no contents
# in that directory.
if not contents_info[0] == 0:
directories[rel_path] = contents_info
if child_node.has_dirs():
child_node.__getSubDirsInfo(root, directories)
def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]:
"""Get information about all directories under the given root.
Returns:
A dictionary with a directory path as key and the relative
contents information (the result of count_contents) as values.
"""
directories = {root: self.count_contents()}
self.__getSubDirsInfo(root, directories)
return directories
def count_contents(self) -> Tuple[int, int]:
"""Count how many contents are present inside a directory.
If a directory has a pid returns as it has all the contents.
Returns:
A tuple with the total number of the contents and the number
of contents known (the ones that have a persistent identifier).
"""
contents = 0
discovered = 0
if not self.otype == DIRECTORY:
raise InvalidObjectType(
"Can't calculate contents of the " "object type: %s" % self.otype
)
if self.known:
# to identify a directory with all files/directories present
return (1, 1)
else:
for _, child_node in self.children.items():
if child_node.otype == CONTENT:
contents += 1
if child_node.known:
discovered += 1
return (contents, discovered)
def has_dirs(self) -> bool:
"""Checks if node has directories
"""
for _, child_node in self.children.items():
if child_node.otype == DIRECTORY:
return True
return False
diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py
index b9bc544..712e28d 100644
--- a/swh/scanner/tests/test_scanner.py
+++ b/swh/scanner/tests/test_scanner.py
@@ -1,106 +1,106 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import json
from pathlib import PosixPath
from .data import correct_api_response, present_swhids, to_exclude_swhid
from swh.scanner.scanner import pids_discovery, get_subpaths, run
from swh.scanner.model import Tree
from swh.scanner.cli import extract_regex_objs
from swh.scanner.exceptions import APIError
aio_url = "http://example.org/api/known/"
def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(
aio_url,
status=200,
content_type="application/json",
body=json.dumps(correct_api_response),
)
actual_result = event_loop.run_until_complete(
pids_discovery([], aiosession, "http://example.org/api/")
)
assert correct_api_response == actual_result
def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(aio_url, content_type="application/json", status=413)
with pytest.raises(APIError):
event_loop.run_until_complete(
pids_discovery([], aiosession, "http://example.org/api/")
)
def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server):
api_url = live_server.url() + "/"
request = [
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901)
] # /known/ is limited at 900
with pytest.raises(APIError):
event_loop.run_until_complete(pids_discovery(request, aiosession, api_url))
def test_scanner_get_subpaths(temp_folder):
root = temp_folder["root"]
actual_result = []
for subpath, pid in get_subpaths(root, tuple()):
# also check if it's a symlink since pytest tmp_dir fixture create
# also a symlink to each directory inside the tmp_dir path
if subpath.is_dir() and not subpath.is_symlink():
actual_result.append((subpath, pid))
assert len(actual_result) == 2
@pytest.mark.options(debug=False)
def test_app(app):
assert not app.debug
def test_scanner_result(live_server, event_loop, test_folder):
api_url = live_server.url() + "/"
sample_folder = test_folder.joinpath(PosixPath("sample-folder"))
source_tree = Tree(sample_folder)
event_loop.run_until_complete(run(sample_folder, api_url, source_tree, set()))
- for node_dict in source_tree.iterate():
- node_info = list(node_dict.values())[0]
+ for child_node in source_tree.iterate():
+ node_info = list(child_node.attributes.values())[0]
if node_info["swhid"] in present_swhids:
assert node_info["known"] is True
else:
assert node_info["known"] is False
def test_scanner_result_with_exclude_patterns(live_server, event_loop, test_folder):
api_url = live_server.url() + "/"
sample_folder = test_folder.joinpath(PosixPath("sample-folder"))
patterns = (str(sample_folder) + "/toexclude",)
exclude_pattern = {
reg_obj for reg_obj in extract_regex_objs(sample_folder, patterns)
}
source_tree = Tree(sample_folder)
event_loop.run_until_complete(
run(sample_folder, api_url, source_tree, exclude_pattern)
)
- for node_dict in source_tree.iterate():
- node_info = list(node_dict.values())[0]
+ for child_node in source_tree.iterate():
+ node_info = list(child_node.attributes.values())[0]
assert node_info["swhid"] != to_exclude_swhid

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:01 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3273810

Event Timeline