diff --git a/requirements-swh.txt b/requirements-swh.txt
index 850ffb6..c34a185 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core >= 0.3
-swh.model >= 0.3.8
+swh.model >= 2.3.0
diff --git a/swh/scanner/benchmark_algos.py b/swh/scanner/benchmark_algos.py
index 6a1d0dd..4523c66 100644
--- a/swh/scanner/benchmark_algos.py
+++ b/swh/scanner/benchmark_algos.py
@@ -1,417 +1,412 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import itertools
import json
import logging
import os
from pathlib import Path
import random
+import time
from typing import Dict, Iterable, List, Optional
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from swh.model.from_disk import Content, Directory, accept_all_directories
-from swh.model.identifiers import CONTENT, DIRECTORY, swhid
+from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID, ObjectType
from .exceptions import APIError
from .model import Status, Tree
from .scanner import directory_filter, extract_regex_objs
session = requests.Session()
retries_rule = Retry(total=5, backoff_factor=1)
session.mount("http://", HTTPAdapter(max_retries=retries_rule))
def query_swhids(
swhids: List[Tree], api_url: str, counter: Optional[collections.Counter] = None
) -> Dict[str, Dict[str, bool]]:
"""
Returns:
A dictionary with:
key(str): persistent identifier
value(dict):
value['known'] = True if pid is found
value['known'] = False if pid is not found
"""
endpoint = api_url + "known/"
chunk_size = 1000
if counter:
counter["queries"] += len(swhids)
def make_request(swhids):
swhids = [swhid.swhid for swhid in swhids]
req = session.post(endpoint, json=swhids)
if req.status_code != 200:
error_message = "%s with given values %s" % (req.text, str(swhids))
raise APIError(error_message)
if counter:
counter["api_calls"] += 1
resp = req.text
return json.loads(resp)
def get_chunk(swhids):
for i in range(0, len(swhids), chunk_size):
yield swhids[i : i + chunk_size]
if len(swhids) > chunk_size:
return dict(
itertools.chain.from_iterable(
make_request(swhids_chunk).items() for swhids_chunk in get_chunk(swhids)
)
)
else:
return make_request(swhids)
def stopngo(source_tree: Tree, api_url: str, counter: collections.Counter):
nodes = []
nodes.append(source_tree)
while len(nodes) > 0:
parsed_nodes = query_swhids(nodes, api_url, counter)
for node in nodes.copy():
nodes.remove(node)
node.known = parsed_nodes[node.swhid]["known"]
node.status = Status.queried
if node.otype == DIRECTORY:
if not node.known:
nodes.extend(list(node.children.values()))
else:
set_children_status(node, [CONTENT, DIRECTORY], True)
def set_father_status(node, known):
"""
Recursively change father known and visited status of a given node
"""
parent = node.father
if parent is None:
return
if parent.status != Status.unset:
return
parent.known = known
set_father_status(parent, known)
def set_children_status(
node: Tree, node_types: Iterable[str], known: bool, status: Status = Status.unset
):
"""
Recursively change the status of the children of the provided node
"""
for child_node in node.iterate():
if child_node.otype in node_types and child_node.status == status:
child_node.known = known
def file_priority(source_tree: Tree, api_url: str, counter: collections.Counter):
# get all the files
all_contents = list(
filter(lambda node: node.otype == CONTENT, source_tree.iterate_bfs())
)
all_contents.reverse() # we check nodes from the deepest
# query the backend to get all file contents status
parsed_contents = query_swhids(all_contents, api_url, counter)
# set all the file contents status
for cnt in all_contents:
cnt.known = parsed_contents[cnt.swhid]["known"]
cnt.status = Status.queried
# set all the upstream directories of unknown file contents to unknown
if not cnt.known:
set_father_status(cnt, False)
# get all unset directories and check their status
# (update children directories accordingly)
unset_dirs = list(
filter(
lambda node: node.otype == DIRECTORY and node.status == Status.unset,
source_tree.iterate(),
)
)
if source_tree.status == Status.unset:
unset_dirs.append(source_tree)
# check unset directories
for dir_ in unset_dirs:
if dir_.status == Status.unset:
# update directory status
dir_.known = query_swhids([dir_], api_url, counter)[dir_.swhid]["known"]
dir_.status = Status.queried
if dir_.known:
set_children_status(dir_, [DIRECTORY], True)
def directory_priority(source_tree: Tree, api_url: str, counter: collections.Counter):
# get all directory contents that have at least one file content
unset_dirs = list(
filter(
lambda dir_: dir_.otype == DIRECTORY and dir_.has_contents,
source_tree.iterate_bfs(),
)
)
unset_dirs.reverse()
for dir_ in unset_dirs:
# if the directory is known set all the downstream file contents to known
if dir_.status == Status.unset:
dir_.known = query_swhids([dir_], api_url, counter)[dir_.swhid]["known"]
dir_.status = Status.queried
if dir_.known:
set_children_status(dir_, [CONTENT], True)
else:
set_father_status(dir_, False)
# get remaining directories that have no file contents
unset_dirs_no_cnts = list(
filter(
lambda node: node.otype == DIRECTORY and not node.has_contents,
source_tree.iterate_bfs(),
)
)
parsed_dirs_no_cnts = query_swhids(unset_dirs_no_cnts, api_url, counter)
# update status of directories that have no file contents
for dir_ in unset_dirs_no_cnts:
dir_.known = parsed_dirs_no_cnts[dir_.swhid]["known"]
dir_.status = Status.queried
# check unknown file contents
unset_files = list(
filter(
lambda node: node.otype == CONTENT and node.status == Status.unset,
source_tree.iterate(),
)
)
parsed_unset_files = query_swhids(unset_files, api_url, counter)
for file_ in unset_files:
file_.known = parsed_unset_files[file_.swhid]["known"]
file_.status = Status.queried
def random_(
source_tree: Tree,
api_url: str,
counter: collections.Counter,
seed: Optional[int] = None,
):
if seed:
random.seed(seed)
# get all directory/file contents
all_nodes = [node for node in source_tree.iterate()] + [source_tree]
# shuffle contents
random.shuffle(all_nodes)
while len(all_nodes):
node = all_nodes.pop()
if node.status != Status.unset:
continue
node.known = query_swhids([node], api_url, counter)[node.swhid]["known"]
node.status = Status.queried
if node.otype == DIRECTORY and node.known:
for child_node in node.iterate():
child_node.known = True
elif node.otype == CONTENT and not node.known:
set_father_status(node, False)
-def algo_min(source_tree: Tree, api_url: str):
+def algo_min(source_tree: Tree, api_url: str, counter: collections.Counter):
"""
The minimal number of queries knowing the known/unknown status of every node
"""
def change_parent_status(node, status):
parent = node.father
if parent is None:
return
parent.status = status
change_parent_status(parent, status)
def change_children_status(node, status):
for child_node in node.iterate():
child_node.status = status
all_nodes = [node for node in source_tree.iterate_bfs()]
parsed_nodes = query_swhids(all_nodes, api_url)
for node in all_nodes:
node.known = parsed_nodes[node.swhid]["known"]
unknown_cnts = list(
filter(lambda node: node.known is False, source_tree.iterate_bfs())
)
unknown_cnts.reverse()
for node in unknown_cnts:
change_parent_status(node, Status.unset)
known_dirs = list(
filter(
lambda node: node.otype == DIRECTORY and node.known is True,
source_tree.iterate(),
)
)
if source_tree.known:
known_dirs += [source_tree]
for dir_ in known_dirs:
change_children_status(dir_, Status.unset)
unset_cnts = list(
filter(lambda node: node.status == Status.unset, source_tree.iterate_bfs())
)
- return len(source_tree) - len(unset_cnts)
+ counter["api_calls"] = -1
+ counter["queries"] = len(source_tree) - len(unset_cnts)
def get_swhids(paths: Iterable[Path], exclude_patterns):
def swhid_of(path):
if path.is_dir():
if exclude_patterns:
def dir_filter(dirpath, *args):
return directory_filter(dirpath, exclude_patterns)
else:
dir_filter = accept_all_directories
obj = Directory.from_disk(
path=bytes(path), dir_filter=dir_filter
).get_data()
- return swhid(DIRECTORY, obj)
+ return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"]))
else:
obj = Content.from_file(path=bytes(path)).get_data()
- return swhid(CONTENT, obj)
+ return str(
+ CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"])
+ )
for path in paths:
yield str(path), swhid_of(path)
def load_source(root, sre_patterns):
"""
Load the source code inside the Tree data structure
"""
def _scan(root_path, source_tree, sre_patterns):
files = []
dirs = []
for elem in os.listdir(root_path):
cnt = Path(root_path).joinpath(elem)
if not os.path.islink(cnt):
if os.path.isfile(cnt):
files.append(cnt)
elif os.path.isdir(cnt):
dirs.append(cnt)
if files:
parsed_file_swhids = dict(get_swhids(files, sre_patterns))
for path, swhid_ in parsed_file_swhids.items():
source_tree.add_node(Path(path), swhid_)
if dirs:
parsed_dirs_swhids = dict(get_swhids(dirs, sre_patterns))
for path, swhid_ in parsed_dirs_swhids.items():
if not directory_filter(path, sre_patterns):
continue
source_tree.add_node(Path(path), swhid_)
_scan(path, source_tree, sre_patterns)
source_tree = Tree(root)
root_swhid = dict(get_swhids([root], sre_patterns))
source_tree.swhid = root_swhid[str(root)]
_scan(root, source_tree, sre_patterns)
return source_tree
def run(
root: str,
api_url: str,
backend_name: str,
exclude_patterns: Iterable[str],
algo: str,
origin: str,
commit: str,
seed: Optional[int] = None,
):
sre_patterns = set()
if exclude_patterns:
sre_patterns = {
reg_obj for reg_obj in extract_regex_objs(Path(root), exclude_patterns)
}
# temporary directory prefix
repo_id = Path(root).parts[-1].split("_")[0]
counter: collections.Counter = collections.Counter()
counter["api_calls"] = 0
counter["queries"] = 0
source_tree = load_source(Path(root), sre_patterns)
logging.info(
f'started processing repo "{repo_id}" with algorithm '
f'"{algo}" and knowledge base "{backend_name}"'
)
+ tstart = time.time()
if algo == "random":
if seed:
random_(source_tree, api_url, counter, seed)
else:
random_(source_tree, api_url, counter)
elif algo == "algo_min":
- min_queries = algo_min(source_tree, api_url)
- min_result = (
- repo_id,
- origin,
- commit,
- backend_name,
- len(source_tree),
- algo,
- -1,
- min_queries,
- )
- print(*min_result, sep=",")
- return
+ algo_min(source_tree, api_url, counter)
elif algo == "stopngo":
stopngo(source_tree, api_url, counter)
elif algo == "file_priority":
file_priority(source_tree, api_url, counter)
elif algo == "directory_priority":
directory_priority(source_tree, api_url, counter)
else:
raise Exception(f'Algorithm "{algo}" not found')
+ tend = time.time()
result = (
repo_id,
origin,
commit,
backend_name,
len(source_tree),
algo,
counter["api_calls"],
counter["queries"],
+ tend - tstart,
)
logging.info(
f'finished processing repo "{repo_id}" with algorithm '
f'"{algo}" and knowledge base "{backend_name}"'
)
print(*result, sep=",")
diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py
index ff14125..1f942a8 100644
--- a/swh/scanner/plot.py
+++ b/swh/scanner/plot.py
@@ -1,278 +1,278 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
The purpose of this module is to display and to interact with the result of the
scanner contained in the model.
The `sunburst` function generates a navigable sunburst chart from the
directories information retrieved from the model. The chart displays for
each directory the total number of files and the percentage of file known.
The size of the directory is defined by the total number of contents whereas
the color gradient is generated relying on the percentage of contents known.
"""
from pathlib import Path
from typing import Dict, List, Tuple
-import numpy as np # type: ignore
+import numpy as np
import pandas as pd # type: ignore
import plotly.graph_objects as go
from plotly.offline import offline
def build_hierarchical_df(
dirs_dataframe: pd.DataFrame,
levels: List[str],
metrics_columns: List[str],
root_name: str,
) -> pd.DataFrame:
"""
Build a hierarchy of levels for Sunburst or Treemap charts.
For each directory the new dataframe will have the following
information:
id: the directory name
parent: the parent directory of id
contents: the total number of contents of the directory id and
the relative subdirectories
known: the percentage of contents known relative to computed
'contents'
Example:
Given the following dataframe:
.. code-block:: none
lev0 lev1 contents known
'' '' 20 2 //root
kernel kernel/subdirker 5 0
telnet telnet/subdirtel 10 4
The output hierarchical dataframe will be like the following:
.. code-block:: none
id parent contents known
20 10.00
kernel/subdirker kernel 5 0.00
telnet/subdirtel telnet 10 40.00
total 20 10.00
kernel total 5 0.00
telnet total 10 40.00
total 35 17.14
To create the hierarchical dataframe we need to iterate through
the dataframe given in input relying on the number of levels.
Based on the previous example we have to do two iterations:
iteration 1
The generated dataframe 'df_tree' will be:
.. code-block:: none
id parent contents known
20 10.0
kernel/subdirker kernel 5 0.0
telnet/subdirtel telnet 10 40.0
iteration 2
The generated dataframe 'df_tree' will be:
.. code-block:: none
id parent contents known
total 20 10.0
kernel total 5 0.0
telnet total 10 40.0
Note that since we have reached the last level, the parent given
to the directory id is the directory root.
The 'total' row il computed by adding the number of contents of the
dataframe given in input and the average of the contents known on
the total number of contents.
"""
def compute_known_percentage(contents: pd.Series, known: pd.Series) -> pd.Series:
"""This function compute the percentage of known contents and generate
the new known column with the percentage values.
It also assures that if there is no contents inside a directory
the percentage is zero
"""
known_values = []
for idx, content_val in enumerate(contents):
if content_val == 0:
known_values.append(0)
else:
percentage = known[idx] / contents[idx] * 100
known_values.append(percentage)
return pd.Series(np.array(known_values))
complete_df = pd.DataFrame(columns=["id", "parent", "contents", "known"])
# revert the level order to start from the deepest
levels = [level for level in reversed(levels)]
contents_col = metrics_columns[0]
known_col = metrics_columns[1]
df_tree_list = []
for i, level in enumerate(levels):
df_tree = pd.DataFrame(columns=["id", "parent", "contents", "known"])
dfg = dirs_dataframe.groupby(levels[i:]).sum()
dfg = dfg.reset_index()
df_tree["id"] = dfg[level].copy()
if i < len(levels) - 1:
# copy the parent directories (one level above)
df_tree["parent"] = dfg[levels[i + 1]].copy()
else:
# last level reached
df_tree["parent"] = root_name
# copy the contents column
df_tree["contents"] = dfg[contents_col]
# compute the percentage relative to the contents
df_tree["known"] = compute_known_percentage(dfg[contents_col], dfg[known_col])
df_tree_list.append(df_tree)
complete_df = complete_df.append(df_tree_list, ignore_index=True)
# create the main parent
total_contents = dirs_dataframe[contents_col].sum()
total_known = dirs_dataframe[known_col].sum()
total_avg = total_known / total_contents * 100
total = pd.Series(
dict(id=root_name, parent="", contents=total_contents, known=total_avg)
)
complete_df = complete_df.append(total, ignore_index=True)
return complete_df
def compute_max_depth(dirs_path: List[Path], root: Path) -> int:
"""Compute the maximum depth level of the given directory paths.
Example: for `var/log/kernel/` the depth level is 3
"""
max_depth = 0
for dir_path in dirs_path:
if dir_path == root:
continue
dir_depth = len(dir_path.parts)
if dir_depth > max_depth:
max_depth = dir_depth
return max_depth
def generate_df_from_dirs(
dirs: Dict[Path, Tuple[int, int]], columns: List[str], root: Path, max_depth: int,
) -> pd.DataFrame:
"""Generate a dataframe from the directories given in input.
Example:
given the following directories as input
.. code-block:: python
dirs = {
'/var/log/': (23, 2),
'/var/log/kernel': (5, 0),
'/var/log/telnet': (10, 3)
}
The generated dataframe will be:
.. code-block:: none
lev0 lev1 lev2 contents known
'var' 'var/log' '' 23 2
'var' 'var/log' 'var/log/kernel' 5 0
'var' 'var/log' 'var/log/telnet' 10 3
"""
def get_parents(path: Path):
parts = path.parts[1:] if path.parts[0] == "/" else path.parts
for i in range(1, len(parts) + 1):
yield "/".join(parts[0:i])
def get_dirs_array():
for dir_path, contents_info in dirs.items():
empty_lvl = max_depth - len(dir_path.parts)
if dir_path == root:
# ignore the root but store contents information
yield [""] * (max_depth) + list(contents_info)
else:
yield list(get_parents(dir_path)) + [""] * empty_lvl + list(
contents_info
)
df = pd.DataFrame(
np.array([dir_array for dir_array in get_dirs_array()]), columns=columns
)
df["contents"] = pd.to_numeric(df["contents"])
df["known"] = pd.to_numeric(df["known"])
return df
def generate_sunburst(
directories: Dict[Path, Tuple[int, int]], root: Path
) -> go.Sunburst:
"""Generate a sunburst chart from the directories given in input.
"""
max_depth = compute_max_depth(list(directories.keys()), root)
metrics_columns = ["contents", "known"]
levels_columns = ["lev" + str(i) for i in range(max_depth)]
df_columns = levels_columns + metrics_columns
dirs_df = generate_df_from_dirs(directories, df_columns, root, max_depth)
hierarchical_df = build_hierarchical_df(
dirs_df, levels_columns, metrics_columns, str(root)
)
sunburst = go.Sunburst(
labels=hierarchical_df["id"],
parents=hierarchical_df["parent"],
values=hierarchical_df["contents"],
branchvalues="total",
marker=dict(
colors=hierarchical_df["known"],
colorscale="matter",
cmid=50,
showscale=True,
),
hovertemplate="""%{label}
Files: %{value}
Known: %{color:.2f}%""",
name="",
)
return sunburst
def offline_plot(graph_object: go):
"""Plot a graph object to an html file
"""
fig = go.Figure()
fig.add_trace(graph_object)
offline.plot(fig, filename="chart.html")
diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py
index 903b87c..8abe331 100644
--- a/swh/scanner/scanner.py
+++ b/swh/scanner/scanner.py
@@ -1,245 +1,249 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import fnmatch
import glob
import itertools
import os
from pathlib import Path
import re
from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union
import aiohttp
from swh.model.from_disk import Content, Directory, accept_all_directories
-from swh.model.identifiers import CONTENT, DIRECTORY, parse_swhid, swhid
+from swh.model.identifiers import CoreSWHID, ObjectType
from .dashboard.dashboard import run_app
from .exceptions import InvalidDirectoryPath, error_response
from .model import Tree
from .plot import generate_sunburst
async def swhids_discovery(
swhids: List[str], session: aiohttp.ClientSession, api_url: str,
) -> Dict[str, Dict[str, bool]]:
"""API Request to get information about the SoftWare Heritage persistent
IDentifiers (SWHIDs) given in input.
Args:
swhids: a list of SWHIDS
api_url: url for the API request
Returns:
A dictionary with:
key: SWHID searched
value:
value['known'] = True if the SWHID is found
value['known'] = False if the SWHID is not found
"""
endpoint = api_url + "known/"
chunk_size = 1000
requests = []
def get_chunk(swhids):
for i in range(0, len(swhids), chunk_size):
yield swhids[i : i + chunk_size]
async def make_request(swhids):
async with session.post(endpoint, json=swhids) as resp:
if resp.status != 200:
error_response(resp.reason, resp.status, endpoint)
return await resp.json()
if len(swhids) > chunk_size:
for swhids_chunk in get_chunk(swhids):
requests.append(asyncio.create_task(make_request(swhids_chunk)))
res = await asyncio.gather(*requests)
# concatenate list of dictionaries
return dict(itertools.chain.from_iterable(e.items() for e in res))
else:
return await make_request(swhids)
def directory_filter(
path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]]
) -> bool:
"""It checks if the path_name is matching with the patterns given in input.
It is also used as a `dir_filter` function when generating the directory
object from `swh.model.from_disk`
Returns:
False if the directory has to be ignored, True otherwise
"""
path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name)
for sre_pattern in exclude_patterns:
if sre_pattern.match(str(path)):
return False
return True
def get_subpaths(
path: Path, exclude_patterns: Iterable[Pattern[str]]
) -> Iterator[Tuple[Path, str]]:
"""Find the SoftWare Heritage persistent IDentifier (SWHID) of
the directories and files under a given path.
Args:
path: the root path
Yields:
pairs of: path, the relative SWHID
"""
def swhid_of(path):
if path.is_dir():
if exclude_patterns:
def dir_filter(dirpath, *args):
return directory_filter(dirpath, exclude_patterns)
else:
dir_filter = accept_all_directories
obj = Directory.from_disk(
path=bytes(path), dir_filter=dir_filter
).get_data()
- return swhid(DIRECTORY, obj)
+ return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"]))
else:
obj = Content.from_file(path=bytes(path)).get_data()
- return swhid(CONTENT, obj)
+ return str(
+ CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"])
+ )
dirpath, dnames, fnames = next(os.walk(path))
for node in itertools.chain(dnames, fnames):
sub_path = Path(dirpath).joinpath(node)
yield (sub_path, swhid_of(sub_path))
async def parse_path(
path: Path,
session: aiohttp.ClientSession,
api_url: str,
exclude_patterns: Iterable[Pattern[str]],
) -> Iterator[Tuple[str, str, bool]]:
"""Check if the sub paths of the given path are present in the
archive or not.
Args:
path: the source path
api_url: url for the API request
Returns:
a map containing tuples with: a subpath of the given path,
the SWHID of the subpath and the result of the api call
"""
parsed_paths = dict(get_subpaths(path, exclude_patterns))
parsed_swhids = await swhids_discovery(
list(parsed_paths.values()), session, api_url
)
def unpack(tup):
subpath, swhid = tup
return (subpath, swhid, parsed_swhids[swhid]["known"])
return map(unpack, parsed_paths.items())
async def run(
config: Dict[str, Any],
root: str,
source_tree: Tree,
exclude_patterns: Iterable[Pattern[str]],
) -> None:
"""Start scanning from the given root.
It fills the source tree with the path discovered.
Args:
root: the root path to scan
api_url: url for the API request
"""
api_url = config["web-api"]["url"]
async def _scan(root, session, api_url, source_tree, exclude_patterns):
for path, obj_swhid, known in await parse_path(
root, session, api_url, exclude_patterns
):
- obj_type = parse_swhid(obj_swhid).object_type
+ obj_type = CoreSWHID.from_string(obj_swhid).object_type
- if obj_type == CONTENT:
+ if obj_type == ObjectType.CONTENT:
source_tree.add_node(path, obj_swhid, known)
- elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns):
+ elif obj_type == ObjectType.DIRECTORY and directory_filter(
+ path, exclude_patterns
+ ):
source_tree.add_node(path, obj_swhid, known)
if not known:
await _scan(path, session, api_url, source_tree, exclude_patterns)
if config["web-api"]["auth-token"]:
headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"}
else:
headers = {}
async with aiohttp.ClientSession(headers=headers, trust_env=True) as session:
await _scan(root, session, api_url, source_tree, exclude_patterns)
def extract_regex_objs(
root_path: Path, patterns: Iterable[str]
) -> Iterator[Pattern[str]]:
"""Generates a regex object for each pattern given in input and checks if
the path is a subdirectory or relative to the root path.
Yields:
an SRE_Pattern object
"""
for pattern in patterns:
for path in glob.glob(pattern):
dirpath = Path(path)
if root_path not in dirpath.parents:
error_msg = (
f'The path "{dirpath}" is not a subdirectory or relative '
f'to the root directory path: "{root_path}"'
)
raise InvalidDirectoryPath(error_msg)
regex = fnmatch.translate((pattern))
yield re.compile(regex)
def scan(
config: Dict[str, Any],
root_path: str,
exclude_patterns: Iterable[str],
out_fmt: str,
interactive: bool,
):
"""Scan a source code project to discover files and directories already
present in the archive"""
sre_patterns = set()
if exclude_patterns:
sre_patterns = {
reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns)
}
source_tree = Tree(Path(root_path))
loop = asyncio.get_event_loop()
loop.run_until_complete(run(config, root_path, source_tree, sre_patterns))
if interactive:
root = Path(root_path)
directories = source_tree.get_directories_info(root)
figure = generate_sunburst(directories, root)
run_app(figure, source_tree)
else:
source_tree.show(out_fmt)