diff --git a/requirements-test.txt b/requirements-test.txt
index c01dc5e..04a3b9a 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,11 +1,8 @@
pytest
aioresponses
pytest_asyncio
pytest_flask
-plotly
-pandas
-numpy
swh.core[testing-core]
swh.model[testing]
swh.storage[testing]
swh.web[testing]
diff --git a/requirements.txt b/requirements.txt
index c076103..6800cb7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,10 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
vcversioner
requests
aiohttp
+plotly
+pandas
+numpy
dulwich
diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py
index 6dc6a65..82d7ffa 100644
--- a/swh/scanner/cli.py
+++ b/swh/scanner/cli.py
@@ -1,55 +1,56 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import asyncio
from pathlib import PosixPath
from .scanner import run
from .model import Tree
from swh.core.cli import CONTEXT_SETTINGS
@click.group(name='scanner', context_settings=CONTEXT_SETTINGS)
@click.pass_context
def scanner(ctx):
'''Software Heritage Scanner tools.'''
pass
def parse_url(url):
if not url.startswith('https://'):
url = 'https://' + url
if not url.endswith('/'):
url += '/'
return url
@scanner.command(name='scan')
@click.argument('path', required=True, type=click.Path(exists=True))
@click.option('-u', '--api-url',
default='https://archive.softwareheritage.org/api/1',
metavar='API_URL', show_default=True,
help="url for the api request")
@click.option('-f', '--format',
- type=click.Choice(['text', 'json'], case_sensitive=False),
+ type=click.Choice(['text', 'json', 'sunburst'],
+ case_sensitive=False),
default='text',
help="select the output format")
@click.pass_context
def scan(ctx, path, api_url, format):
"""Scan a source code project to discover files and directories already
present in the archive"""
api_url = parse_url(api_url)
source_tree = Tree(PosixPath(path))
loop = asyncio.get_event_loop()
loop.run_until_complete(run(path, api_url, source_tree))
source_tree.show(format)
if __name__ == '__main__':
scan()
diff --git a/swh/scanner/exceptions.py b/swh/scanner/exceptions.py
index ca5b83e..e9e482b 100644
--- a/swh/scanner/exceptions.py
+++ b/swh/scanner/exceptions.py
@@ -1,14 +1,18 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+class InvalidObjectType(TypeError):
+ pass
+
+
class APIError(Exception):
def __str__(self):
return '"%s"' % self.args
def error_response(reason: str, status_code: int, api_url: str):
error_msg = f'{status_code} {reason}: \'{api_url}\''
raise APIError(error_msg)
diff --git a/swh/scanner/model.py b/swh/scanner/model.py
index 620c4c4..7d647a8 100644
--- a/swh/scanner/model.py
+++ b/swh/scanner/model.py
@@ -1,165 +1,181 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import sys
import json
from pathlib import PosixPath
-from typing import Any, Dict, List
+from typing import Any, Dict, Tuple
from enum import Enum
from .plot import sunburst
+from .exceptions import InvalidObjectType
from swh.model.identifiers import (
DIRECTORY, CONTENT
)
class Color(Enum):
blue = '\033[94m'
green = '\033[92m'
red = '\033[91m'
end = '\033[0m'
def colorize(text: str, color: Color):
return color.value + text + Color.end.value
class Tree:
"""Representation of a file system structure
"""
def __init__(self, path: PosixPath, father: Tree = None):
self.father = father
self.path = path
self.otype = DIRECTORY if path.is_dir() else CONTENT
self.pid = ''
self.children: Dict[PosixPath, Tree] = {}
def addNode(self, path: PosixPath, pid: str = None) -> None:
"""Recursively add a new path.
"""
relative_path = path.relative_to(self.path)
if relative_path == PosixPath('.'):
if pid is not None:
self.pid = pid
return
new_path = self.path.joinpath(relative_path.parts[0])
if new_path not in self.children:
self.children[new_path] = Tree(new_path, self)
self.children[new_path].addNode(path, pid)
def show(self, format) -> None:
"""Show tree in different formats"""
if format == 'json':
print(json.dumps(self.getTree(), indent=4, sort_keys=True))
elif format == 'text':
isatty = sys.stdout.isatty()
print(colorize(str(self.path), Color.blue) if isatty
else str(self.path))
self.printChildren(isatty)
elif format == 'sunburst':
root = self.path
- directories = {root: self.count_contents()}
- directories = self.getDirectoriesInfo(directories, root)
+ directories = self.getDirectoriesInfo(root)
sunburst(directories, root)
def printChildren(self, isatty: bool, inc: int = 1) -> None:
for path, node in self.children.items():
self.printNode(node, isatty, inc)
if node.children:
node.printChildren(isatty, inc+1)
def printNode(self, node: Any, isatty: bool, inc: int) -> None:
rel_path = str(node.path.relative_to(self.path))
begin = '│ ' * inc
end = '/' if node.otype == DIRECTORY else ''
if isatty:
if not node.pid:
rel_path = colorize(rel_path, Color.red)
elif node.otype == DIRECTORY:
rel_path = colorize(rel_path, Color.blue)
elif node.otype == CONTENT:
rel_path = colorize(rel_path, Color.green)
print(f'{begin}{rel_path}{end}')
def getTree(self):
"""Walk through the tree to discover content or directory that have
a persistent identifier. If a persistent identifier is found it saves
the path with the relative PID.
Returns:
child_tree: the tree with the content/directory found
"""
child_tree = {}
for path, child_node in self.children.items():
rel_path = str(child_node.path.relative_to(self.path))
if child_node.pid:
child_tree[rel_path] = child_node.pid
else:
next_tree = child_node.getTree()
if next_tree:
child_tree[rel_path] = next_tree
return child_tree
- def getDirectoriesInfo(self, directories, root) -> Dict[PosixPath, List]:
- """Get information about all directories stored inside the tree.
-
- Returns:
- A dictionary with the path as key and the contents information
- as values.
-
+ def __getSubDirsInfo(self, root, directories):
+ """Fills the directories given in input with the contents information
+ stored inside the directory child, only if they have contents.
"""
for path, child_node in self.children.items():
if child_node.otype == DIRECTORY:
rel_path = path.relative_to(root)
contents_info = child_node.count_contents()
+ # checks the first element of the tuple
+ # (the number of contents in a directory)
+ # if it is equal to zero it means that there are no contents
+ # in that directory.
if not contents_info[0] == 0:
directories[rel_path] = contents_info
if child_node.has_dirs():
- child_node.getDirectoriesInfo(directories, root)
+ child_node.__getSubDirsInfo(root, directories)
+
+ def getDirectoriesInfo(self, root: PosixPath
+ ) -> Dict[PosixPath, Tuple[int, int]]:
+ """Get information about all directories under the given root.
+
+ Returns:
+ A dictionary with a directory path as key and the relative
+ contents information (the result of count_contents) as values.
+ """
+ directories = {root: self.count_contents()}
+ self.__getSubDirsInfo(root, directories)
return directories
- def count_contents(self) -> List[int]:
+ def count_contents(self) -> Tuple[int, int]:
"""Count how many contents are present inside a directory.
If a directory has a pid returns as it has all the contents.
Returns:
- A list with the number of contents / discovered contents.
+ A tuple with the total number of the contents and the number
+ of contents known (the ones that have a persistent identifier).
"""
contents = 0
discovered = 0
- # to identificate a directory with all files/directories present
- if self.otype == DIRECTORY and self.pid:
- return [1, 1]
-
- for _, child_node in self.children.items():
- if child_node.otype == CONTENT:
- contents += 1
- if child_node.pid:
- discovered += 1
-
- return [contents, discovered]
+ if not self.otype == DIRECTORY:
+ raise InvalidObjectType('Can\'t calculate contents of the '
+ 'object type: %s' % self.otype)
+
+ if self.pid:
+ # to identify a directory with all files/directories present
+ return (1, 1)
+ else:
+ for _, child_node in self.children.items():
+ if child_node.otype == CONTENT:
+ contents += 1
+ if child_node.pid:
+ discovered += 1
+
+ return (contents, discovered)
def has_dirs(self) -> bool:
"""Checks if node has directories
"""
for _, child_node in self.children.items():
if child_node.otype == DIRECTORY:
return True
return False
diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py
new file mode 100644
index 0000000..ae4d467
--- /dev/null
+++ b/swh/scanner/plot.py
@@ -0,0 +1,264 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""
+The purpose of this module is to display and to interact with the result of the
+scanner contained in the model.
+
+The `sunburst` function generates a navigable sunburst chart from the
+directories information retrieved from the model. The chart displays for
+each directory the total number of files and the percentage of file known.
+
+The size of the directory is defined by the total number of contents whereas
+the color gradient is generated relying on the percentage of contents known.
+"""
+
+from typing import List, Dict, Tuple
+from pathlib import PosixPath
+
+from plotly.offline import offline # type: ignore
+import plotly.graph_objects as go # type: ignore
+import pandas as pd # type: ignore
+import numpy as np # type: ignore
+
+
+def build_hierarchical_df(
+ dirs_dataframe: pd.DataFrame, levels: List[str],
+ metrics_columns: List[str], root_name: str) -> pd.DataFrame:
+ """
+ Build a hierarchy of levels for Sunburst or Treemap charts.
+
+ For each directory the new dataframe will have the following
+ information:
+
+ id: the directory name
+ parent: the parent directory of id
+ contents: the total number of contents of the directory id and
+ the relative subdirectories
+ known: the percentage of contents known relative to computed
+ 'contents'
+
+ Example:
+ Given the following dataframe:
+
+ .. code-block:: none
+
+ lev0 lev1 contents known
+ '' '' 20 2 //root
+ kernel kernel/subdirker 5 0
+ telnet telnet/subdirtel 10 4
+
+ The output hierarchical dataframe will be like the following:
+
+ .. code-block:: none
+
+ id parent contents known
+ 20 10.00
+ kernel/subdirker kernel 5 0.00
+ telnet/subdirtel telnet 10 40.00
+ total 20 10.00
+ kernel total 5 0.00
+ telnet total 10 40.00
+ total 35 17.14
+
+ To create the hierarchical dataframe we need to iterate through
+ the dataframe given in input relying on the number of levels.
+
+ Based on the previous example we have to do two iterations:
+
+ iteration 1
+ The generated dataframe 'df_tree' will be:
+
+ .. code-block:: none
+
+ id parent contents known
+ 20 10.0
+ kernel/subdirker kernel 5 0.0
+ telnet/subdirtel telnet 10 40.0
+
+ iteration 2
+ The generated dataframe 'df_tree' will be:
+
+ .. code-block:: none
+
+ id parent contents known
+ total 20 10.0
+ kernel total 5 0.0
+ telnet total 10 40.0
+
+ Note that since we have reached the last level, the parent given
+ to the directory id is the directory root.
+
+ The 'total' row il computed by adding the number of contents of the
+ dataframe given in input and the average of the contents known on
+ the total number of contents.
+
+ """
+ def compute_known_percentage(contents: pd.Series, known: pd.Series
+ ) -> pd.Series:
+ """This function compute the percentage of known contents and generate
+ the new known column with the percentage values.
+
+ It also assures that if there is no contents inside a directory
+ the percentage is zero
+
+ """
+ known_values = []
+ for idx, content_val in enumerate(contents):
+ if content_val == 0:
+ known_values.append(0)
+ else:
+ percentage = known[idx] / contents[idx] * 100
+ known_values.append(percentage)
+
+ return pd.Series(np.array(known_values))
+
+ complete_df = pd.DataFrame(columns=['id', 'parent', 'contents', 'known'])
+ # revert the level order to start from the deepest
+ levels = [level for level in reversed(levels)]
+ contents_col = metrics_columns[0]
+ known_col = metrics_columns[1]
+
+ df_tree_list = []
+ for i, level in enumerate(levels):
+ df_tree = pd.DataFrame(columns=['id', 'parent', 'contents', 'known'])
+ dfg = dirs_dataframe.groupby(levels[i:]).sum()
+ dfg = dfg.reset_index()
+ df_tree['id'] = dfg[level].copy()
+ if i < len(levels) - 1:
+ # copy the parent directories (one level above)
+ df_tree['parent'] = dfg[levels[i+1]].copy()
+ else:
+ # last level reached
+ df_tree['parent'] = root_name
+
+ # copy the contents column
+ df_tree['contents'] = dfg[contents_col]
+ # compute the percentage relative to the contents
+ df_tree['known'] = compute_known_percentage(
+ dfg[contents_col], dfg[known_col])
+
+ df_tree_list.append(df_tree)
+
+ complete_df = complete_df.append(df_tree_list, ignore_index=True)
+
+ # create the main parent
+ total_contents = dirs_dataframe[contents_col].sum()
+ total_known = dirs_dataframe[known_col].sum()
+ total_avg = total_known / total_contents * 100
+
+ total = pd.Series(dict(id=root_name, parent='',
+ contents=total_contents,
+ known=total_avg))
+
+ complete_df = complete_df.append(total, ignore_index=True)
+
+ return complete_df
+
+
+def compute_max_depth(dirs_path: List[PosixPath], root: PosixPath) -> int:
+ """Compute the maximum depth level of the given directory paths.
+
+ Example: for `var/log/kernel/` the depth level is 3
+
+ """
+ max_depth = 0
+ for dir_path in dirs_path:
+ if dir_path == root:
+ continue
+
+ dir_depth = len(dir_path.parts)
+ if dir_depth > max_depth:
+ max_depth = dir_depth
+
+ return max_depth
+
+
+def generate_df_from_dirs(dirs: Dict[PosixPath, Tuple[int, int]],
+ columns: List[str], root: PosixPath, max_depth: int
+ ) -> pd.DataFrame:
+ """Generate a dataframe from the directories given in input.
+
+ Example:
+ given the following directories as input
+
+ .. code-block:: python
+
+ dirs = {
+ '/var/log/': (23, 2),
+ '/var/log/kernel': (5, 0),
+ '/var/log/telnet': (10, 3)
+ }
+
+ The generated dataframe will be:
+
+ .. code-block:: none
+
+ lev0 lev1 lev2 contents known
+ 'var' 'var/log' '' 23 2
+ 'var' 'var/log' 'var/log/kernel' 5 0
+ 'var' 'var/log' 'var/log/telnet' 10 3
+
+ """
+ def get_parents(path: PosixPath):
+ parts = path.parts[1:] if path.parts[0] == '/' else path.parts
+
+ for i in range(1, len(parts)+1):
+ yield '/'.join(parts[0:i])
+
+ def get_dirs_array():
+ for dir_path, contents_info in dirs.items():
+ empty_lvl = max_depth - len(dir_path.parts)
+
+ if dir_path == root:
+ # ignore the root but store contents information
+ yield ['']*(max_depth) + list(contents_info)
+ else:
+ yield list(get_parents(dir_path)) + \
+ ['']*empty_lvl + \
+ list(contents_info)
+
+ df = pd.DataFrame(np.array(
+ [dir_array for dir_array in get_dirs_array()]), columns=columns)
+
+ df['contents'] = pd.to_numeric(df['contents'])
+ df['known'] = pd.to_numeric(df['known'])
+
+ return df
+
+
+def sunburst(directories: Dict[PosixPath, Tuple[int, int]],
+ root: PosixPath) -> None:
+ """Show the sunburst chart from the directories given in input.
+
+ """
+ max_depth = compute_max_depth(list(directories.keys()), root)
+ metrics_columns = ['contents', 'known']
+ levels_columns = ['lev'+str(i) for i in range(max_depth)]
+
+ df_columns = levels_columns + metrics_columns
+ dirs_df = generate_df_from_dirs(directories, df_columns, root, max_depth)
+
+ hierarchical_df = build_hierarchical_df(
+ dirs_df, levels_columns, metrics_columns, str(root))
+ known_avg = dirs_df['known'].sum() / dirs_df['contents'].sum()
+
+ fig = go.Figure()
+ fig.add_trace(go.Sunburst(
+ labels=hierarchical_df['id'],
+ parents=hierarchical_df['parent'],
+ values=hierarchical_df['contents'],
+ branchvalues='total',
+ marker=dict(
+ colors=hierarchical_df['known'],
+ colorscale='RdBu',
+ cmid=known_avg),
+ hovertemplate='''%{label}
+
Files: %{value}
+
Known: %{color:.2f}%''',
+ name=''
+ ))
+
+ offline.plot(fig, filename='sunburst.html')
diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py
index 3120409..a932de1 100644
--- a/swh/scanner/tests/conftest.py
+++ b/swh/scanner/tests/conftest.py
@@ -1,101 +1,136 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import asyncio
import aiohttp
import os
from pathlib import PosixPath
from aioresponses import aioresponses # type: ignore
from swh.model.cli import pid_of_file, pid_of_dir
+from swh.scanner.model import Tree
from .flask_api import create_app
@pytest.fixture
def mock_aioresponse():
with aioresponses() as m:
yield m
@pytest.fixture
def event_loop():
"""Fixture that generate an asyncio event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.fixture
async def aiosession():
"""Fixture that generate an aiohttp Client Session."""
session = aiohttp.ClientSession()
yield session
session.detach()
@pytest.fixture(scope='session')
def temp_folder(tmp_path_factory):
"""Fixture that generates a temporary folder with the following
structure:
.. code-block:: python
root = {
subdir: {
subsubdir
filesample.txt
filesample2.txt
}
subdir2
subfile.txt
}
"""
root = tmp_path_factory.getbasetemp()
subdir = tmp_path_factory.mktemp('subdir')
subsubdir = subdir.joinpath('subsubdir')
subsubdir.mkdir()
subdir2 = tmp_path_factory.mktemp('subdir2')
subfile = root / 'subfile.txt'
subfile.touch()
filesample = subdir / 'filesample.txt'
filesample.touch()
filesample2 = subdir / 'filesample2.txt'
filesample2.touch()
avail_path = {
subdir: pid_of_dir(bytes(subdir)),
subsubdir: pid_of_dir(bytes(subsubdir)),
subdir2: pid_of_dir(bytes(subdir2)),
subfile: pid_of_file(bytes(subfile)),
filesample: pid_of_file(bytes(filesample)),
filesample2: pid_of_file(bytes(filesample2))
}
return {
'root': root,
'paths': avail_path,
'filesample': filesample,
'filesample2': filesample2,
'subsubdir': subsubdir,
'subdir': subdir
}
-@pytest.fixture(scope='session')
-def app():
- """Flask backend API (used by live_server)."""
- app = create_app()
- return app
+@pytest.fixture(scope='function')
+def example_tree(temp_folder):
+ """Fixture that generate a Tree with the root present in the
+ session fixture "temp_folder".
+ """
+ example_tree = Tree(temp_folder['root'])
+ assert example_tree.path == temp_folder['root']
+
+ return example_tree
+
+
+@pytest.fixture(scope='function')
+def example_dirs(example_tree, temp_folder):
+ """
+ Fixture that fill the fixture example_tree with the values contained in
+ the fixture temp_folder and returns the directories information of the
+ filled example_tree.
+
+ """
+ root = temp_folder['root']
+ filesample_path = temp_folder['filesample']
+ filesample2_path = temp_folder['filesample2']
+ subsubdir_path = temp_folder['subsubdir']
+ known_paths = [filesample_path, filesample2_path, subsubdir_path]
+
+ for path, pid in temp_folder['paths'].items():
+ if path in known_paths:
+ example_tree.addNode(path, pid)
+ else:
+ example_tree.addNode(path)
+
+ return example_tree.getDirectoriesInfo(root)
@pytest.fixture
def test_folder():
"""Location of the "data" folder """
tests_path = PosixPath(os.path.abspath(__file__)).parent
tests_data_folder = tests_path.joinpath('data')
assert tests_data_folder.exists()
return tests_data_folder
+
+
+@pytest.fixture(scope='session')
+def app():
+ """Flask backend API (used by live_server)."""
+ app = create_app()
+ return app
diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py
index e7c37c5..904356e 100644
--- a/swh/scanner/tests/test_model.py
+++ b/swh/scanner/tests/test_model.py
@@ -1,89 +1,71 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import pytest
-
-from swh.scanner.model import Tree
-
-
-@pytest.fixture(scope='function')
-def example_tree(temp_folder):
- """Fixture that generate a Tree with the root present in the
- session fixture "temp_folder".
- """
- example_tree = Tree(temp_folder['root'])
- assert example_tree.path == temp_folder['root']
-
- return example_tree
-
def test_tree_add_node(example_tree, temp_folder):
avail_paths = temp_folder['paths'].keys()
for path, pid in temp_folder['paths'].items():
example_tree.addNode(path, pid)
for path, node in example_tree.children.items():
assert path in avail_paths
if node.children:
for subpath, subnode in node.children.items():
assert subpath in avail_paths
def test_get_json_tree_all_not_present(example_tree, temp_folder):
for path, pid in temp_folder['paths'].items():
example_tree.addNode(path)
json_tree = example_tree.getTree()
assert len(json_tree) == 0
def test_get_json_tree_all_present(example_tree, temp_folder):
for path, pid in temp_folder['paths'].items():
example_tree.addNode(path, pid)
tree_dict = example_tree.getTree()
assert len(tree_dict) == 3
# since subdir have a pid, it can't have a children path
assert tree_dict['subdir0'] is not dict
def test_get_json_tree_only_one_present(example_tree, temp_folder):
filesample_path = temp_folder['filesample']
for path, pid in temp_folder['paths'].items():
if path == filesample_path:
example_tree.addNode(path, pid)
else:
example_tree.addNode(path)
tree_dict = example_tree.getTree()
assert len(tree_dict) == 1
assert tree_dict['subdir0']['filesample.txt']
def test_get_directories_info(example_tree, temp_folder):
root_path = temp_folder['root']
filesample_path = temp_folder['filesample']
filesample2_path = temp_folder['filesample2']
subdir_path = temp_folder['subdir'].relative_to(root_path)
subsubdir_path = temp_folder['subsubdir'].relative_to(root_path)
for path, pid in temp_folder['paths'].items():
if path == filesample_path or path == filesample2_path:
- print(path)
example_tree.addNode(path, pid)
else:
example_tree.addNode(path)
- tree_root = example_tree
- directories = {tree_root.path: tree_root.count_contents()}
- directories = tree_root.getDirectoriesInfo(directories, tree_root.path)
+ directories = example_tree.getDirectoriesInfo(example_tree.path)
assert subsubdir_path not in directories
- assert directories[subdir_path] == [2, 2]
+ assert directories[subdir_path] == (2, 2)
diff --git a/swh/scanner/tests/test_plot.py b/swh/scanner/tests/test_plot.py
new file mode 100644
index 0000000..a1eb56e
--- /dev/null
+++ b/swh/scanner/tests/test_plot.py
@@ -0,0 +1,56 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.scanner.plot import (
+ compute_max_depth, generate_df_from_dirs, build_hierarchical_df
+)
+
+
+def test_max_depth(temp_folder, example_dirs):
+ root = temp_folder['root']
+ max_depth = compute_max_depth(example_dirs, root)
+ assert max_depth == 2
+
+
+def test_generate_df_from_dirs(temp_folder, example_dirs):
+ root = temp_folder['root']
+ max_depth = compute_max_depth(example_dirs, root)
+ metrics_columns = ['contents', 'known']
+ levels_columns = ['lev'+str(i) for i in range(max_depth)]
+ df_columns = levels_columns + metrics_columns
+
+ actual_df = generate_df_from_dirs(
+ example_dirs, df_columns, root, max_depth)
+
+ # assert root is empty
+ assert actual_df['lev0'][0] == ''
+ assert actual_df['lev1'][0] == ''
+
+ # assert subdir has correct contents information
+ assert actual_df['contents'][1] == 2
+ assert actual_df['known'][1] == 2
+
+ # assert subsubdir has correct level information
+ assert actual_df['lev0'][2] == 'subdir0'
+ assert actual_df['lev1'][2] == 'subdir0/subsubdir'
+
+
+def test_build_hierarchical_df(temp_folder, example_dirs):
+ root = temp_folder['root']
+ max_depth = compute_max_depth(example_dirs, root)
+ metrics_columns = ['contents', 'known']
+ levels_columns = ['lev'+str(i) for i in range(max_depth)]
+ df_columns = levels_columns + metrics_columns
+
+ actual_df = generate_df_from_dirs(
+ example_dirs, df_columns, root, max_depth)
+
+ actual_result = build_hierarchical_df(
+ actual_df, levels_columns, metrics_columns, root)
+
+ assert actual_result['parent'][1] == 'subdir0'
+ assert actual_result['contents'][1] == 2
+ assert actual_result['id'][5] == root
+ assert actual_result['known'][5] == 75