Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342787
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
View Options
diff --git a/swh/scanner/model.py b/swh/scanner/model.py
index 5502fd3..9ed9e2f 100644
--- a/swh/scanner/model.py
+++ b/swh/scanner/model.py
@@ -1,229 +1,239 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import sys
import json
from pathlib import PosixPath
from typing import Any, Dict, Tuple, Iterable
from enum import Enum
import ndjson
from .plot import generate_sunburst, offline_plot
from .exceptions import InvalidObjectType
from swh.model.identifiers import DIRECTORY, CONTENT
class Color(Enum):
blue = "\033[94m"
green = "\033[92m"
red = "\033[91m"
end = "\033[0m"
def colorize(text: str, color: Color):
return color.value + text + Color.end.value
class Tree:
"""Representation of a file system structure
"""
def __init__(self, path: PosixPath, father: Tree = None):
self.father = father
self.path = path
self.otype = DIRECTORY if path.is_dir() else CONTENT
self.swhid = ""
self.known = False
self.children: Dict[PosixPath, Tree] = {}
def addNode(self, path: PosixPath, swhid: str, known: bool) -> None:
"""Recursively add a new path.
"""
relative_path = path.relative_to(self.path)
if relative_path == PosixPath("."):
self.swhid = swhid
self.known = known
return
new_path = self.path.joinpath(relative_path.parts[0])
if new_path not in self.children:
self.children[new_path] = Tree(new_path, self)
self.children[new_path].addNode(path, swhid, known)
def show(self, format) -> None:
"""Show tree in different formats"""
if format == "json":
print(json.dumps(self.toDict(), indent=4, sort_keys=True))
if format == "ndjson":
- print(ndjson.dumps(dict_path for dict_path in self.iterate()))
+ print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr()))
elif format == "text":
isatty = sys.stdout.isatty()
print(colorize(str(self.path), Color.blue) if isatty else str(self.path))
self.printChildren(isatty)
elif format == "sunburst":
root = self.path
directories = self.getDirectoriesInfo(root)
sunburst = generate_sunburst(directories, root)
offline_plot(sunburst)
def printChildren(self, isatty: bool, inc: int = 1) -> None:
for path, node in self.children.items():
self.printNode(node, isatty, inc)
if node.children:
node.printChildren(isatty, inc + 1)
def printNode(self, node: Any, isatty: bool, inc: int) -> None:
rel_path = str(node.path.relative_to(self.path))
begin = "│ " * inc
end = "/" if node.otype == DIRECTORY else ""
if isatty:
if not node.known:
rel_path = colorize(rel_path, Color.red)
elif node.otype == DIRECTORY:
rel_path = colorize(rel_path, Color.blue)
elif node.otype == CONTENT:
rel_path = colorize(rel_path, Color.green)
print(f"{begin}{rel_path}{end}")
@property
def attributes(self):
"""
Get the attributes of the current node grouped by the relative path.
Returns:
a dictionary containing a path as key and its known/unknown status and the
Software Heritage persistent identifier as values.
"""
return {str(self.path): {"swhid": self.swhid, "known": self.known,}}
def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]:
"""
Recursively groups the current child nodes inside a dictionary.
For example, if you have the following structure:
.. code-block:: none
root {
subdir: {
file.txt
}
}
The generated dictionary will be:
.. code-block:: none
{
"root": {
"swhid": "...",
"known": True/False
}
"root/subdir": {
"swhid": "...",
"known": True/False
}
"root/subdir/file.txt": {
"swhid": "...",
"known": True/False
}
}
"""
- for node_dict in self.iterate():
+ for node_dict in self.__iterNodesAttr():
dict_nodes.update(node_dict)
return dict_nodes
- def iterate(self) -> Iterable[Dict[str, Dict]]:
+ def iterate(self) -> Iterable[Tree]:
"""
Recursively iterate through the children of the current node
+ """
+ for _, child_node in self.children.items():
+ yield child_node
+ if child_node.otype == DIRECTORY:
+ yield from child_node.iterate()
+
+ def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]:
+ """
+ Recursively iterate through the children of the current node returning
+ an iterable of the children nodes attributes
+
Yields:
a dictionary containing a path with its known/unknown status and the
Software Heritage persistent identifier
-
"""
- for _, child_node in self.children.items():
+ for child_node in self.iterate():
yield child_node.attributes
if child_node.otype == DIRECTORY:
- yield from child_node.iterate()
+ yield from child_node.__iterNodesAttr()
def __getSubDirsInfo(self, root, directories):
"""Fills the directories given in input with the contents information
stored inside the directory child, only if they have contents.
"""
for path, child_node in self.children.items():
if child_node.otype == DIRECTORY:
rel_path = path.relative_to(root)
contents_info = child_node.count_contents()
# checks the first element of the tuple
# (the number of contents in a directory)
# if it is equal to zero it means that there are no contents
# in that directory.
if not contents_info[0] == 0:
directories[rel_path] = contents_info
if child_node.has_dirs():
child_node.__getSubDirsInfo(root, directories)
def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]:
"""Get information about all directories under the given root.
Returns:
A dictionary with a directory path as key and the relative
contents information (the result of count_contents) as values.
"""
directories = {root: self.count_contents()}
self.__getSubDirsInfo(root, directories)
return directories
def count_contents(self) -> Tuple[int, int]:
"""Count how many contents are present inside a directory.
If a directory has a pid returns as it has all the contents.
Returns:
A tuple with the total number of the contents and the number
of contents known (the ones that have a persistent identifier).
"""
contents = 0
discovered = 0
if not self.otype == DIRECTORY:
raise InvalidObjectType(
"Can't calculate contents of the " "object type: %s" % self.otype
)
if self.known:
# to identify a directory with all files/directories present
return (1, 1)
else:
for _, child_node in self.children.items():
if child_node.otype == CONTENT:
contents += 1
if child_node.known:
discovered += 1
return (contents, discovered)
def has_dirs(self) -> bool:
"""Checks if node has directories
"""
for _, child_node in self.children.items():
if child_node.otype == DIRECTORY:
return True
return False
diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py
index b9bc544..712e28d 100644
--- a/swh/scanner/tests/test_scanner.py
+++ b/swh/scanner/tests/test_scanner.py
@@ -1,106 +1,106 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import json
from pathlib import PosixPath
from .data import correct_api_response, present_swhids, to_exclude_swhid
from swh.scanner.scanner import pids_discovery, get_subpaths, run
from swh.scanner.model import Tree
from swh.scanner.cli import extract_regex_objs
from swh.scanner.exceptions import APIError
aio_url = "http://example.org/api/known/"
def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(
aio_url,
status=200,
content_type="application/json",
body=json.dumps(correct_api_response),
)
actual_result = event_loop.run_until_complete(
pids_discovery([], aiosession, "http://example.org/api/")
)
assert correct_api_response == actual_result
def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(aio_url, content_type="application/json", status=413)
with pytest.raises(APIError):
event_loop.run_until_complete(
pids_discovery([], aiosession, "http://example.org/api/")
)
def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server):
api_url = live_server.url() + "/"
request = [
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901)
] # /known/ is limited at 900
with pytest.raises(APIError):
event_loop.run_until_complete(pids_discovery(request, aiosession, api_url))
def test_scanner_get_subpaths(temp_folder):
root = temp_folder["root"]
actual_result = []
for subpath, pid in get_subpaths(root, tuple()):
# also check if it's a symlink since pytest tmp_dir fixture create
# also a symlink to each directory inside the tmp_dir path
if subpath.is_dir() and not subpath.is_symlink():
actual_result.append((subpath, pid))
assert len(actual_result) == 2
@pytest.mark.options(debug=False)
def test_app(app):
assert not app.debug
def test_scanner_result(live_server, event_loop, test_folder):
api_url = live_server.url() + "/"
sample_folder = test_folder.joinpath(PosixPath("sample-folder"))
source_tree = Tree(sample_folder)
event_loop.run_until_complete(run(sample_folder, api_url, source_tree, set()))
- for node_dict in source_tree.iterate():
- node_info = list(node_dict.values())[0]
+ for child_node in source_tree.iterate():
+ node_info = list(child_node.attributes.values())[0]
if node_info["swhid"] in present_swhids:
assert node_info["known"] is True
else:
assert node_info["known"] is False
def test_scanner_result_with_exclude_patterns(live_server, event_loop, test_folder):
api_url = live_server.url() + "/"
sample_folder = test_folder.joinpath(PosixPath("sample-folder"))
patterns = (str(sample_folder) + "/toexclude",)
exclude_pattern = {
reg_obj for reg_obj in extract_regex_objs(sample_folder, patterns)
}
source_tree = Tree(sample_folder)
event_loop.run_until_complete(
run(sample_folder, api_url, source_tree, exclude_pattern)
)
- for node_dict in source_tree.iterate():
- node_info = list(node_dict.values())[0]
+ for child_node in source_tree.iterate():
+ node_info = list(child_node.attributes.values())[0]
assert node_info["swhid"] != to_exclude_swhid
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:01 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3273810
Attached To
rDTSCN Code scanner
Event Timeline
Log In to Comment